In [2]:
import os
import json
import torch
import numpy as np
from sentence_transformers import SentenceTransformer, util
from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers.utils.quantization_config import BitsAndBytesConfig


DATA_PROCESSED_PATH = "../data/processed"
FINETUNED_RETRIEVER_PATH = "../models/retriever_finetuned/best-model"
LOCAL_GENERATOR_PATH = "../models/generator_qwen"

WIKIPEDIA_CHUNKS_FILE = "wikipedia_chunks.jsonl"
CORPUS_EMBEDDINGS_FILE = "corpus_embeddings_finetuned.npy"

USE_GENERATOR_QUANTIZATION_ON_LOAD = True
GENERATOR_QUANTIZATION_TYPE_ON_LOAD = "int8"

TOP_K_RETRIEVER = 3
MAX_CONTEXT_TOKENS_FOR_GENERATOR = 2048

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

wikipedia_chunks_path = os.path.join(DATA_PROCESSED_PATH, WIKIPEDIA_CHUNKS_FILE)
corpus_embeddings_path = os.path.join(DATA_PROCESSED_PATH, CORPUS_EMBEDDINGS_FILE)

if not os.path.exists(FINETUNED_RETRIEVER_PATH):
    raise FileNotFoundError(f"Fine-tuned retriver not found: {FINETUNED_RETRIEVER_PATH}")
if not os.path.exists(LOCAL_GENERATOR_PATH):
    raise FileNotFoundError(f"Generator not found locally: {LOCAL_GENERATOR_PATH}")
if not os.path.exists(wikipedia_chunks_path):
    raise FileNotFoundError(f"Wikipedia corpus file not found: {wikipedia_chunks_path}")
if not os.path.exists(corpus_embeddings_path):
    raise FileNotFoundError(f"Corpus embeddings file not found: {corpus_embeddings_path}")

Using device: cuda


In [3]:
retriever_model = SentenceTransformer(FINETUNED_RETRIEVER_PATH, device=DEVICE)
print("Retriever loaded.")

corpus_passages_data = []
with open(wikipedia_chunks_path, "r", encoding="utf-8") as f:
    for line in f:
        corpus_passages_data.append(json.loads(line))
print(f"Loaded {len(corpus_passages_data)} passages.")

corpus_embeddings = np.load(corpus_embeddings_path)
print(f"Corpus embeddings loaded. Shape: {corpus_embeddings.shape}")

Retriever loaded.
Loaded 36508 passages.
Corpus embeddings loaded. Shape: (36508, 768)


In [4]:
generator_tokenizer = AutoTokenizer.from_pretrained(
    LOCAL_GENERATOR_PATH, use_fast=True, trust_remote_code=True
)
if generator_tokenizer.pad_token_id is None:
    generator_tokenizer.pad_token_id = generator_tokenizer.eos_token_id
print("Generator's tokenizer loaded from local path.")

quantization_config_gen = BitsAndBytesConfig(load_in_8bit=True)
model_gen_kwargs = {"device_map": "auto", "trust_remote_code": True}
model_gen_kwargs["quantization_config"] = quantization_config_gen

generator_model = AutoModelForCausalLM.from_pretrained(LOCAL_GENERATOR_PATH, **model_gen_kwargs)
print("Generator loaded from local path.")

Generator's tokenizer loaded from local path.




Generator loaded from local path.


In [15]:
def retrieve_contexts(query_text, top_k=TOP_K_RETRIEVER):
    query_embedding = retriever_model.encode(query_text, convert_to_numpy=True)
    if query_embedding.ndim == 1:
        query_embedding = query_embedding.reshape(1, -1)

    cosine_scores = util.cos_sim(query_embedding, corpus_embeddings)[0].numpy()

    if top_k >= len(cosine_scores):
        top_k_indices = np.argsort(cosine_scores)[::-1][:top_k]
    else:
        top_k_indices_unsorted = np.argpartition(-cosine_scores, range(top_k))[:top_k]
        top_k_indices = top_k_indices_unsorted[np.argsort(-cosine_scores[top_k_indices_unsorted])]

    retrieved_passages = [corpus_passages_data[idx] for idx in top_k_indices]
    return retrieved_passages, [float(cosine_scores[idx]) for idx in top_k_indices]


def format_rag_prompt(query_text, retrieved_contexts_data):
    context_str = ""
    for i, context_data in enumerate(retrieved_contexts_data):
        context_str += f"Context [{i+1}]: {context_data['passage_text']}\n\n"

    instruction = (
        "Answer the given question USING ONLY information in provided contexts."
        "If the exact answer doesn't apper in provided context, then EXACTLY THIS: "
        "'Sorry, I don't know the answer based on the articles provided.'"
        "and don't say anything after that\n\n"
    )

    full_prompt_content = f"{instruction}Provided context:\n{context_str}Question: {query_text}"

    messages = [{"role": "user", "content": full_prompt_content}]
    formatted_prompt = generator_tokenizer.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    return formatted_prompt, full_prompt_content


def generate_answer_with_rag(
    query_text,
    max_new_tokens=250,
    temperature=0.1,
    top_p=0.9,
    do_sample=False,
    print_passages=True,
    print_details=True,
):
    retrieved_contexts, scores = retrieve_contexts(query_text, top_k=TOP_K_RETRIEVER)
    if not retrieved_contexts:
        return "Sorry, I failed to find any articles to answer this question."

    if print_passages:
        print("Retrived passages:")
        for i, (ctx, score) in enumerate(zip(retrieved_contexts, scores)):
            if print_details:
                print(
                    f"  (Rank: {i+1}) (PassageID: {ctx['passage_id']}) (Score: {score:.4f}) (Title: {ctx['document_title']}) \"{ctx['passage_text']}\""
                )
            else:
                print(f"  {ctx['passage_text']}")

    rag_prompt_formatted, _ = format_rag_prompt(query_text, retrieved_contexts)
    inputs = generator_tokenizer(
        rag_prompt_formatted, return_tensors="pt", padding=False, truncation=False
    ).to(DEVICE)

    with torch.no_grad():
        outputs = generator_model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            temperature=temperature if do_sample else 1.0,
            top_p=top_p if do_sample else 1.0,
            do_sample=do_sample,
            pad_token_id=generator_tokenizer.pad_token_id,
            eos_token_id=generator_tokenizer.eos_token_id,
        )

    response_text = generator_tokenizer.decode(
        outputs[0][inputs.input_ids.shape[-1] :], skip_special_tokens=True
    )
    return response_text

In [18]:
test_query_rag1 = "what is the title of the second harry potter book"
print(f"Question: {test_query_rag1}\n")
res = generate_answer_with_rag(test_query_rag1, print_details=False)
print(f"\nResponse: {res}")

# print("\n" + "#"*70 + "\n")

# test_query_rag2 = "What is the main export product of Japan according to the provided texts?"
# print(f"Question: {test_query_rag2}\n")
# res = generate_answer_with_rag(test_query_rag2)
# print(f"\nResponse: {res}")

# print("\n" + "#"*70 + "\n")

# test_query_rag3 = "Who is the current president of Mars based on the documents?" # question with no anwser in wiki corpus
# print(f"Question: {test_query_rag3}\n")
# res = generate_answer_with_rag(test_query_rag3, print_details=False)
# print(f"\nResponse: {res}")

Question: what is the title of the second harry potter book

Retrived passages:
  The second book, Harry Potter and the Chamber of Secrets, was originally published in the UK on 2 July 1998 and in the US on 2 June 1999. Harry Potter and the Prisoner of Azkaban was published a year later in the UK on 8 July 1999 and in the US on 8 September 1999. Harry Potter and the Goblet of Fire was published on 8 July 2000 at the same time by Bloomsbury and Scholastic. Harry Potter and the Order of the Phoenix is the longest book in the series, at 766 pages in the UK version and 870 pages in the US version. It was published worldwide in English on 21 June 2003. Harry Potter and the Half-Blood Prince was published on 16 July 2005. The seventh and final novel, Harry Potter and the Deathly Hallows, was published on 21 July 2007. Rowling herself has stated that the last chapter of the final book (in fact, the epilogue) was completed "in something like 1990". Rowling retained rights to digital editions a