<a href="https://colab.research.google.com/github/Vihith123/RAG_Application/blob/main/NutriChatBot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Perform Google Colab installs (if running in Google Colab)
import os

if "COLAB_GPU" in os.environ:
    print("[INFO] Running in Google Colab, installing requirements.")
    #!pip install -U torch # requires torch 2.1.1+ (for efficient sdpa implementation)
    !pip install PyMuPDF # for reading PDFs with Python
    !pip install tqdm # for progress bars
    !pip install sentence-transformers # for embedding models
    !pip install accelerate # for quantization model loading
    !pip install bitsandbytes # for quantizing models (less storage space)
    !pip install flash-attn --no-build-isolation # for faster attention mechanism = faster LLM inference


In [None]:
!pip uninstall -y torch torchvision torchaudio transformers sentence-transformers

!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

!pip install -U transformers sentence-transformers


In [None]:
# Download PDF file
import os
import requests

# Get PDF document
pdf_path = "human-nutrition-text.pdf"

# Download PDF if it doesn't already exist
if not os.path.exists(pdf_path):
    print("File doesn't exist, downloading...")

    # The URL of the PDF you want to download
    url = "https://pressbooks.oer.hawaii.edu/humannutrition2/open/download?type=pdf"

    # The local filename to save the downloaded file
    filename = pdf_path

    # Send a GET request to the URL
    response = requests.get(url)

    # Check if the request was successful
    if response.status_code == 200:
        # Open a file in binary write mode and save the content to it
        with open(filename, "wb") as file:
            file.write(response.content)
        print(f"The file has been downloaded and saved as {filename}")
    else:
        print(f"Failed to download the file. Status code: {response.status_code}")
else:
    print(f"File {pdf_path} exists.")


In [None]:
import fitz # This is the PyMuPdf
from tqdm.auto import tqdm # This Helps in faster iteration
def text_formatter(text: str) ->str:
  #Performs Minor operation on the text
  cleaned_text = text.replace("\n"," ").strip()
  return cleaned_text

# Now we will open the pdf
def open_and_read_pdf(pdf_path: str) ->list:
  doc = fitz.open(pdf_path) # opening a document
  pages_texts = [] # this is where we store the text contained in the certain number of page
  for page_number, page in tqdm(enumerate(doc)):
    text = page.get_text()
    text = text_formatter(text)
    pages_texts.append({"page_number": page_number + 1,
                        "page_char_count": len(text),
                        "page_word_count": len(text.split(" ")),
                        "page_sentence_count": len(text.split(". ")),
                        "Page_token_count": len(text)/4,
                        "text": text
                        })
  return pages_texts # Moved return statement outside the loop

pages_texts = open_and_read_pdf(pdf_path = pdf_path)
pages_texts[:3]

In [None]:
import pandas as pd
df = pd.DataFrame(pages_texts)
df.head()

In [None]:
df.describe().round(2)

In [None]:
from spacy.lang.en import English
nlp = English()
nlp.add_pipe("sentencizer") # It breaks Sentences in a paragraph into Single Sentences
doc = nlp("This is Vihith. I am a chess Player")
assert len(list(doc.sents))==2
list(doc.sents)

In [None]:
for item in tqdm(pages_texts):
    item["sentences"] = list(nlp(item["text"]).sents)

    # Make sure all sentences are strings
    item["sentences"] = [str(sentence) for sentence in item["sentences"]]

    # Count the sentences
    item["page_sentence_count_spacy"] = len(item["sentences"])


In [None]:
import random
random.sample(pages_texts,k=1)

In [None]:
# Define split size to turn groups of sentences into chunks
num_sentence_chunk_size = 10

# Create a function that recursively splits a list into desired sizes
def split_list(input_list: list,
               slice_size: int) -> list[list[str]]:
    """
    Splits the input_list into sublists of size slice_size (or as close as possible).

    For example, a list of 17 sentences would be split into two lists of [[10], [7]]
    """
    return [input_list[i:i + slice_size] for i in range(0, len(input_list), slice_size)]

# Loop through pages and texts and split sentences into chunks
for item in tqdm(pages_texts):
    item["sentence_chunks"] = split_list(input_list=item["sentences"],
                                         slice_size=num_sentence_chunk_size)
    item["num_chunks"] = len(item["sentence_chunks"])


In [None]:
import re

# Split each chunk into its own item
pages_and_chunks = []
for item in tqdm(pages_texts):
    for sentence_chunk in item["sentence_chunks"]:
        chunk_dict = {}
        chunk_dict["page_number"] = item["page_number"]

        # Join the sentences together into a paragraph-like structure, aka a chunk (so they are a single string)
        joined_sentence_chunk = " ".join(sentence_chunk).replace("  ", " ").strip()
        joined_sentence_chunk = re.sub(r'\.([A-Z])', r'. \1', joined_sentence_chunk)  # ".A" -> ". A" for any full-stop/capital letter combo
        chunk_dict["sentence_chunk"] = joined_sentence_chunk

        # Get stats about the chunk
        chunk_dict["chunk_char_count"] = len(joined_sentence_chunk)
        chunk_dict["chunk_word_count"] = len([word for word in joined_sentence_chunk.split(" ")])
        chunk_dict["chunk_token_count"] = len(joined_sentence_chunk) // 4  # 1 token = ~4 characters

        pages_and_chunks.append(chunk_dict)

# How many chunks do we have?
len(pages_and_chunks)



In [None]:
import random
random.sample(pages_and_chunks, k =1)

In [None]:
df = pd.DataFrame(pages_and_chunks)
df.describe().round(2)

In [None]:
min_chunk_char_count = 30
pages_and_chunks_over_min_length = [
    chunk for chunk in pages_and_chunks if chunk["chunk_char_count"] >= min_chunk_char_count
]

print(f"Number of chunks before filtering: {len(pages_and_chunks)}")
print(f"Number of chunks after filtering (min char count >= {min_chunk_char_count}): {len(pages_and_chunks_over_min_length)}")


In [None]:
pages_and_chunks_over_min_length[:2]

### Set up your Google API Key

To use the Gemini API, you'll need an API key. If you don't already have one, create a key in [Google AI Studio](https://aistudio.google.com/app/apikey).

In Colab, add the key to the secrets manager under the "ðŸ”‘" in the left panel. Give it the name `GOOGLE_API_KEY`. Then, the following code will retrieve and configure the API key for use with `google.generativeai`.

In [None]:
# import google.generativeai as genai
# # from google.colab import userdata

# # Retrieve API key from Colab secrets
# GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')

# # Configure the generative AI library
# genai.configure(api_key=GOOGLE_API_KEY)

# print("Google Generative AI API configured successfully.")

Now that the API is configured, you can use the `gemini-embedding-001` model to embed content. Here's an example:

In [None]:
# result = genai.embed_content(
#     model="gemini-embedding-001",
#     content="What is the meaning of life?"
# )

# print("Embedding for 'What is the meaning of life?':")
# print(result['embedding'])

In [None]:
# import google.generativeai as genai
# from google.colab import userdata

# # Ensure your GOOGLE_API_KEY is set in Colab secrets
# # This cell should be executed after the GOOGLE_API_KEY is properly configured
# GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
# genai.configure(api_key=GOOGLE_API_KEY)

# embedding_model_name = "gemini-embedding-001"
# print(f"Using embedding model: {embedding_model_name}")


In [None]:
# #import google.generativeai as genai
# #from tqdm.auto import tqdm

# # Ensure GOOGLE_API_KEY is configured (from the previous cell)
# # gemini-embedding-001 is a cloud-based model, so no .to("cuda") is needed

# f#or item in tqdm(pages_and_chunks_over_min_length):
#     # Embed content using the Gemini embedding model
#   #  response = genai.embed_content(
#        model=embedding_model_name,
#    #     content=item["sentence_chunk"]
#     #)
#     #item["embedding"] = response['embedding']

# #print("Embeddings generated successfully using Gemini embedding model.")


In [None]:
import torch
from sentence_transformers import SentenceTransformer

device = "cuda" if torch.cuda.is_available() else "cpu"

# Load the SentenceTransformer model locally
embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2", device=device)
print(f"SentenceTransformer model 'all-mpnet-base-v2' loaded successfully to {device}.")

In [None]:
from tqdm.auto import tqdm

# The embedding_model (SentenceTransformer) is assumed to be loaded in a previous cell.

for item in tqdm(pages_and_chunks_over_min_length):
    # Embed content using the local SentenceTransformer model
    # The model returns a tensor, convert to a list for easier storage in DataFrame later
    item["embedding"] = embedding_model.encode(item["sentence_chunk"], convert_to_tensor=False).tolist()

print("Embeddings generated successfully using SentenceTransformer model.")

In [None]:
text_chunk_and_embeddings_df = pd.DataFrame(pages_and_chunks_over_min_length)
embeddings_df_save_path = "text_chunk_and_embeddings_df.csv"

text_chunk_and_embeddings_df.to_csv(embeddings_df_save_path, index = False)

In [None]:
text_chunk_and_embeddings_df_load = pd.read_csv(embeddings_df_save_path)
text_chunk_and_embeddings_df_load.head()

In [None]:
import random

import torch
import numpy as np
import pandas as pd
# Removed json import as it's no longer used for parsing this string format

device = "cuda" if torch.cuda.is_available() else "cpu"

# Import texts and embedding df
text_chunks_and_embedding_df = pd.read_csv("text_chunk_and_embeddings_df.csv")

# Function to parse the string representation of a numpy array back into a numpy array
def parse_embedding_string_to_array(embedding_str):
    # Remove leading/trailing brackets, any ellipsis, and split by space.
    # Example string in CSV: "[ 0.0674242675  0.0902281404 -0.00509548886 ...]"
    # First, handle potential truncation by '...' if it's present, otherwise it might cause ValueError.
    if '...' in embedding_str:
        # Attempt to remove '...' and ensure proper closing bracket, if it was truncated.
        # This is a heuristic and assumes '...' is at the end of meaningful numbers.
        embedding_str = embedding_str.split('...')[0].strip() + ']'

    # Remove outer brackets and any extra spaces before splitting
    cleaned_str = embedding_str.strip('[] ')

    # Split by space and convert each numeric part to float. Filter out any empty strings.
    try:
        float_list = [float(num_str) for num_str in cleaned_str.split() if num_str]
        return np.array(float_list, dtype=np.float32)
    except ValueError:
        print(f"Warning: Could not fully parse embedding string: '{embedding_str}'. Returning empty array.")
        return np.array([], dtype=np.float32)

# Apply the custom parsing function to the 'embedding' column
text_chunks_and_embedding_df["embedding"] = text_chunks_and_embedding_df["embedding"].apply(parse_embedding_string_to_array)

# Filter out any rows where parsing might have failed or resulted in an incorrect embedding dimension
# The embedding model 'all-mpnet-base-v2' produces 768-dimensional embeddings.
expected_embedding_dim = 768
text_chunks_and_embedding_df = text_chunks_and_embedding_df[
    text_chunks_and_embedding_df["embedding"].apply(lambda x: len(x) == expected_embedding_dim)
].reset_index(drop=True)

# Convert texts and embedding df to list of dicts
pages_and_chunks = text_chunks_and_embedding_df.to_dict(orient="records")

# Convert embeddings to torch tensor and send to device (NumPy arrays are float64, torch tensors are float32 by default)
embeddings = torch.tensor(np.array(text_chunks_and_embedding_df["embedding"].tolist()), dtype=torch.float32).to(device)
embeddings.shape

In [None]:
text_chunks_and_embedding_df.head()

In [None]:
embeddings[0]

In [None]:
from sentence_transformers import util, SentenceTransformer
embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2", device=device)

In [None]:
query = "macronutrients functions"
query_embedding = embedding_model.encode(query, convert_to_tensor = True)
dot_scores = util.dot_score(a=query_embedding, b=embeddings)[0]
top_results_of_dot_scores = torch.topk(dot_scores, k=5)
top_results_of_dot_scores

In [None]:
import textwrap

def print_wrapped(text,wrap_length=80):
  wrapped_text = textwrap.wrap(text, wrap_length)
  print(wrapped_text)

In [None]:
print(f"Query: '{query}'\n")
print("Results:")
# Loop through zipped together scores and indiciâ€‹es from torch.topk
for score, idx in zip(top_results_of_dot_scores[0], top_results_of_dot_scores[1]):
    print(f"Score: {score:.4f}")
    # Print relevant sentence chunk (since the scores are in descending order, the most relevant chunk will be first)
    print("Text:")
    print_wrapped(pages_and_chunks[idx]["sentence_chunk"])
    # Print the page number too so we can reference the textbook further (and check the results)
    print(f"Page number: {pages_and_chunks[idx]['page_number']}")
    print("\n")


In [None]:
from sentence_transformers import util, SentenceTransformer
embedding_model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2", device=device)

def retrieve_element(query: str,
                     embeddings: torch.tensor,
                     model: SentenceTransformer=embedding_model,
                     n_of_resources_to_return: int = 5,
                     ):
  query_embedding = embedding_model.encode(query, convert_to_tensor = True)
  dot_scores = util.dot_score(query_embedding,embeddings)[0] # Corrected from dot_scores to dot_score
  scores, indices = torch.topk(input=dot_scores, k = n_of_resources_to_return)
  return scores, indices


def print_top_results_and_scores(query: str,
                                 embeddings: torch.tensor,
                                 pages_and_chunks: list[dict]=pages_and_chunks,
                                 n_resources_to_return: int=5):

    """
    Takes a query, retrieves most relevant resources and prints them out in descending order.

    Note: Requires pages_and_chunks to be formatted in a specific way (see above for reference).
    """

    scores, indices = retrieve_element(query=query,
                                                  embeddings=embeddings,
                                                  n_of_resources_to_return=n_resources_to_return)

    print(f"Query: {query}\n")
    print("Results:")
    # Loop through zipped together scores and indices
    for score, index in zip(scores, indices):
        print(f"Score: {score:.4f}")
        # Print relevant sentence chunk (since the scores are in descending order,
        # the most relevant chunk will be first)
        print_wrapped(pages_and_chunks[index]["sentence_chunk"])
        # Print the page number too so we can reference the textbook further and check the results
        print(f"Page number: {pages_and_chunks[index]['page_number']}")
        print("\n")

In [None]:
query = "symptoms of pellagra"
scores, indices = retrieve_element(query=query,
                                   embeddings=embeddings,
                                   n_of_resources_to_return=5)
scores, indices

In [None]:
query = "symptoms of pellagra"
print_top_results_and_scores(query=query,
                                   embeddings=embeddings,
                                   pages_and_chunks=pages_and_chunks)

In [None]:
# Import the notebook login utility
from huggingface_hub import notebook_login

# Log in using the token stored in Colab secrets
# If HF_TOKEN is in Colab secrets and enabled for the notebook, this will use it automatically.
# Otherwise, it will prompt you to paste your token.
notebook_login()

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers.utils import is_flash_attn_2_available
# 1. Creating Quantization for config Model
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype = torch.float16)
attn_implementation = "sdpa"
print(f"[INFO] Using attention implementation: {attn_implementation}")

# Define use_quantization_config (assuming it should be True given the setup)
use_quantization_config = True

# 2. Pick a model we'd like to use (this will depend on how much GPU memory you have available)
model_id = "google/gemma-2b-it" # Uncommented and assigned a value
print(f"[INFO] Using model_id: {model_id}")

# 3. Instantiate tokenizer (tokenizer turns text into numbers ready for the model)
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_id)

# 4. Instantiate the model
llm_model = AutoModelForCausalLM.from_pretrained(
    pretrained_model_name_or_path=model_id,
    torch_dtype=torch.float16,  # datatype to use, we want float16
    quantization_config=quantization_config if use_quantization_config else None,
    low_cpu_mem_usage=False,  # use full memory
    attn_implementation=attn_implementation  # which attention version to use
)

if not use_quantization_config:  # quantization takes care of device setting automatically, so if it's not used, send model to GPU
    llm_model.to("cuda")


In [None]:
llm_model

In [None]:
def get_model_param(model: torch.nn.Module):
  return sum([param.numel() for param in model.parameters()])

get_model_param(llm_model)

In [None]:
input_text = "What are the macronutrients, and what roles do they play in the human body?"
print(f"Input text:\n{input_text}")

# Create prompt template for instruction-tuned model
dialogue_template = [
    {"role": "user",
     "content": input_text}
]

# Apply the chat template
prompt = tokenizer.apply_chat_template(conversation=dialogue_template,
                                        tokenize=False,  # Keep as raw text (not tokenized)
                                        add_generation_prompt=True)

print(f"\nPrompt (formatted):\n{prompt}")

In [None]:
# Tokenize the input text (turn it into numbers) and send it to GPU
input_ids = tokenizer(prompt, return_tensors="pt").to("cuda")
print(f"Model input (tokenized):\n{input_ids}\n")

# Generate outputs passed on the tokenized input
# See generate docs: https://huggingface.co/docs/transformers/v4.38.2/en/main_classes/text_generation#transformers.GenerationConfig
outputs = llm_model.generate(**input_ids,
                             max_new_tokens=256)  # define the maximum number of new tokens to create

print(f"Model output (tokens):\n{outputs[0]}\n")

In [None]:
# Decode the output tokens to text
outputs_decoded = tokenizer.decode(outputs[0])
print(f"Model output (decoded):\n{outputs_decoded}\n")

In [None]:
# Nutrition-style questions generated with GPT4
gpt4_questions = [
"What are the macronutrients, and what roles do they play in the human body?",
"How do vitamins and minerals differ in their roles and importance for health?",
"Describe the process of digestion and absorption of nutrients in the human body.",
"What role does fibre play in digestion? Name five fibre containing foods.",
"Explain the concept of energy balance and its importance in weight management."
]

# Manually created question list
manual_questions = [
"How often should infants be breastfed?",
"What are symptoms of pellagra?",
"How does saliva help with digestion?",
"What is the RDI for protein per day?",
"water soluble vitamins"
]

query_list = gpt4_questions + manual_questions

In [None]:
import random

query = random.choice(query_list)

print(f"Query: {query}")

# Get just the scores and indices of top related results
scores, indices = retrieve_element(query=query,
                                              embeddings=embeddings)

scores, indices

In [None]:
def prompt_formatter(query: str,
                     context_items: list[dict]) -> str:
    """
    Augments query with text-based context from context_items.
    """
    # Join context items into one dotted paragraph
    context = "- " + "\n- ".join([item["sentence_chunk"] for item in context_items])

    # Create a base prompt with examples to help the model
    # Note: this is very customizable, I've chosen to use 3 examples of the answer style we'd like.
    # We could also write this in a text file and import it in if we wanted.
    base_prompt = """Based on the following context items, please answer the query.
Give yourself room to think by extracting relevant passages from the context before answering the query.
Don't return the thinking, only return the answer.
Make sure your answers are as explanatory as possible.
Use the following examples as reference for the ideal answer style.
\nExample 1:
Query: What are the fat-soluble vitamins?
Answer: The fat-soluble vitamins include Vitamin A, Vitamin D, Vitamin E, and Vitamin K. These vitamins are absorbed along with fat in the diet and can be stored in the body's fatty tissue.
\nExample 2:
Query: What are the causes of type 2 diabetes?
Answer: Type 2 diabetes is often associated with overeating, particularly the overconsumption of calories leading to obesity. Factors like lack of physical activity and genetic predisposition also contribute.
\nExample 3:
Query: What is the importance of hydration for physical performance?
Answer: Hydration is crucial for physical performance because water plays key roles in maintaining blood volume, regulating body temperature, and enabling muscle contractions.
\nNow use the following context items to answer the user query:
{context}. The context contains the answer. Look carefully and extract relevant information.
\nRelevant passages: <extract relevant passages from the context here>
User query: {query}
Answer:"""


    # Update base prompt with context items and query
    base_prompt = base_prompt.format(context=context, query=query)

    # Create prompt template for instruction-tuned model
    dialogue_template = [
    {"role": "user",
     "content": base_prompt}
   ]

    # Apply the chat template
    prompt = tokenizer.apply_chat_template(conversation=dialogue_template,
                                       tokenize=False,
                                       add_generation_prompt=True)

    return prompt

In [None]:
import random

query = random.choice(query_list)
print(f"Query: {query}")

# Get relevant resources
scores, indices = retrieve_element(query=query,
                                             embeddings=embeddings)

# Create a list of context items
context_items = [pages_and_chunks[i] for i in indices]

# Format prompt with context items
prompt = prompt_formatter(query=query,
                         context_items=context_items)
print(prompt)

In [None]:
%%time

input_ids = tokenizer(prompt, return_tensors="pt").to("cuda")

# Generate an output of tokens
outputs = llm_model.generate(**input_ids,
                            temperature=0.7, # lower temperature = less random
                            do_sample=True, # whether or not to use sampling
                            max_new_tokens=256) # how many new tokens to generate

# Turn the output tokens into text
output_text = tokenizer.decode(outputs[0])

print(f"Query: {query}")
print(f"RAG answer:\n{output_text.replace(prompt, '')}")

In [None]:
def ask(query,
        temperature=0.7,
        max_new_tokens=512,
        format_answer_text=True,
        return_answer_only=True):
    """
    Takes a query, finds relevant resources/context and generates an answer to the query based on the relevant resources.
    """

    # Get just the scores and indices of top related results
    scores, indices = retrieve_element(query=query,
                                                 embeddings=embeddings)

    # Create a list of context items
    context_items = [pages_and_chunks[i] for i in indices]

    # Add score to context item
    for i, item in enumerate(context_items):
        item["score"] = scores[i].cpu() # return score back to CPU

    # Format the prompt with context items
    prompt = prompt_formatter(query=query,
                            context_items=context_items)

    # Tokenize the prompt
    input_ids = tokenizer(prompt, return_tensors="pt").to("cuda")

   # Turn the output tokens into text
    output_text = tokenizer.decode(outputs[0])

    if format_answer_text:
        # Replace special tokens and unnecessary help message
        output_text = output_text.replace(prompt, "").replace("<bos>", "").replace("<eos>", "").replace("Sure, here is the answer to the user query:\n\n", "")

    # Only return the answer without the context items
    if return_answer_only:
        return output_text

    return output_text, context_items

In [None]:
query = random.choice(query_list)
print(f"Query: {query}")

# Answer query with context and return context
answer, context_items = ask(query=query,
                            temperature=0.7,
                            max_new_tokens=512,
                            return_answer_only=False)

print(f"\nAnswer:\n")
print_wrapped(answer)
print(f"Context items:")
context_items
