<a href="https://colab.research.google.com/github/anuvarghese4/Grand-School-Projects--/blob/main/RAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# before running the code, run this cell once and restart and run all
%%capture --no-stderr
%pip install numpy==1.26.4 langchain-community langchain-openai langchain-chroma gradio==3.38
%pip install nbimporter
%pip install langsmith
%pip install huggingface_hub

In [3]:
import warnings
warnings.filterwarnings('ignore')
from bs4 import BeautifulSoup
from urllib.request import Request, urlopen
import ssl
import os
import time
import requests
import nbimporter
import gradio as gr
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma.vectorstores import Chroma
from langchain.memory import ConversationBufferMemory
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.prompts import MessagesPlaceholder
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_history_aware_retriever
from langchain.chains import create_retrieval_chain
from langchain.agents import Tool, AgentType, initialize_agent
from google.colab import drive
# ignore the warnings as they do not affect the code

ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [None]:
base_path = '/content' # <- change this to your own base path where you keep the notebook and constant.py
os.chdir(base_path)
os.listdir(base_path)

In [None]:
# import API KEYS from constant.py
from constant import OPENAI_API_KEY, LANGSMITH_API_KEY, HF_TOKEN

os.environ['OPENAI_API_KEY'] = OPENAI_API_KEY
os.environ["LANGSMITH_TRACING"] = "true"
os.environ['LANGSMITH_API_KEY'] = LANGSMITH_API_KEY
os.environ['HF_TOKEN'] = HF_TOKEN
os.environ['USER_AGENT'] = 'Mozilla/5.0'

print(OPENAI_API_KEY[:5])
print(LANGSMITH_API_KEY[:5])

In [None]:
# function to get sitemap from the url

def get_sitemap(url):
    try:
        req = Request(
            url = url,
            headers={'User-Agent': 'Mozilla/5.0'}
        )

        response = urlopen(req)

        encoding = response.headers.get_content_charset('utf-8')

        xml_data = response.read().decode(encoding)

        xml_soup = BeautifulSoup(xml_data, "xml")

        return xml_soup

    except Exception as e:
        print(f"Error fetching sitemap: {e}")
        return None

In [None]:
# get xmls related to breast cancer

def get_urls(xml):
  urls = []
  for url in xml.find_all('url'):
    if url.find('loc'):
      loc = url.find('loc').text
      if 'breast' in loc and 'video' not in loc:
        urls.append(loc)

  return urls

In [None]:
# retrieve articles with breast cancer
url = "https://www.cancer.gov/sitemaps/pageinstructions.xml"
xml = get_sitemap(url)
urls = get_urls(xml)

In [None]:
urls

In [None]:
%pip list

In [None]:
# categorize urls based on file type
categorized_urls = {
    "pdf": [],
    "epub": [],
    "mobi": [],
    "html": [],
}

for url in urls:
    if url.endswith(".pdf"):
        categorized_urls["pdf"].append(url)
    elif url.endswith(".epub"):
        categorized_urls["epub"].append(url)
    elif url.endswith(".mobi"):
        categorized_urls["mobi"].append(url)
    else:
        categorized_urls["html"].append(url)

In [None]:
# from htmls, exclude pdfs
pdf_urls = []

for url in categorized_urls['html']:
    try:
        response = requests.get(url, timeout=10)
        content_type = response.headers.get("Content-Type", "")

        if "pdf" in content_type.lower():
            pdf_urls.append(url)
            print(f"Found PDF: {url}")

        time.sleep(1)

    except Exception as e:
        print(f" Error fetching {url}: {e}")

In [None]:
categorized_urls["html"] = [url for url in categorized_urls["html"] if url not in pdf_urls]
categorized_urls["pdf"].extend(pdf_urls)

In [None]:
# load documents
from langchain_community.document_loaders import WebBaseLoader

docs = []
for i, url in enumerate(categorized_urls['html']):
  loader = WebBaseLoader(url).load()
  docs.extend(loader)
  if i != 0 and i % 10 == 0:
    print(f'{i} html documents are loaded')

print('All html documents are loaded')

In [None]:
# create retriever
text_splitters = RecursiveCharacterTextSplitter()
documents = text_splitters.split_documents(docs)
embedding = OpenAIEmbeddings()
vector_db = Chroma.from_documents(documents = documents, embedding = embedding)
retriever = vector_db.as_retriever()

In [None]:
# create question answer chain
system_prompt = (
    """
    You are an assistant in question-answering tasks.
    Provide answers using the retrieved context.
    If there is no relevent context for the question.
    Simply state, No relevant documents.
    Be brief and stick to the key points, and try to
    use the ordered numeric format if you can.
    The number of items does not matter.
    Ask follow-up questions if the question is incomplete or not clear
    \n\n
    {context}
    """
)

prompt = ChatPromptTemplate.from_messages(
    [
        ('system', system_prompt),
        MessagesPlaceholder('chat_history'),
        ('human', '{input}')
    ]
)

llm = ChatOpenAI(model = 'gpt-4o-mini', temperature = 0.0)

qa_chain = create_stuff_documents_chain(llm, prompt)

In [None]:
# create retriever chain with chat history
history_system_prompt = (
    """
    Given chat history and the latest user input, formualte a
    standalone question that can be understood without referencing
    chat history.

    This is NOT telling you to generate answers, but to reformulate
    the questions or return them as they are.
    \n\n
    {{context}}
    """
)

history_prompt = ChatPromptTemplate.from_messages(
    [
        ('system', history_system_prompt),
        MessagesPlaceholder('chat_history'),
        ('human', '{input}')
    ]
)

history_aware_retriever = create_history_aware_retriever(llm, retriever, history_prompt)
rag_chain = create_retrieval_chain(history_aware_retriever, qa_chain)

In [None]:
# test the retirever
from langchain_core.messages import HumanMessage, AIMessage

test_history = []

def rag_test():

  while True:

    question = input('Enter your question: ').strip()

    if question.lower() in ['q', 'quit']:
      print('Exiting Chat, Goodbye!')
      break

    response = rag_chain.invoke({'input': question, 'chat_history': test_history})
    test_history.extend(
        [
            HumanMessage(content = question),
            AIMessage(content = response.get('answer', 'No response available'))
        ]
    )

    print('\nAnswer:\n')
    print(response.get('answer', 'No response available'))
    print('-' * 50)
    print('\n')

rag_test()

Ask questions like:

* What are the risk factors for breast cancer?
* How is breast cancer diagnosed?
* What are the different stages of breast cancer?

In [None]:
# create function to use rag chain in agent's tool
def domain_info(user_input: str, memory:ConversationBufferMemory) -> str:

  full_chat_history = memory.load_memory_variables({})['chat_history']

  result = rag_chain.invoke(
      {'input': user_input, 'chat_history': full_chat_history}
  )

  answer = result.get('answer', 'No response available')

  memory.save_context(
      {'input': user_input},
      {'output': answer}
  )

  print(answer)
  return answer

In [None]:
# create a rag tool for agent
domain_rag = Tool(
    name = 'DomainRAGInfo',
    func = lambda user_input: domain_info(user_input, global_memory),
    description = """
    Use this tool for questions about breast cancer from the knowledge base that may
    rely on previous conversation.
    """
)

tools = [domain_rag]

rag_llm = ChatOpenAI(model = 'gpt-4o-mini', temperature = 0.0)

global_memory = ConversationBufferMemory(
    memory_key = 'chat_history',
    return_messages = True,
    input_key = 'input',
    output_key = 'output'
)

In [None]:
# create agent
domain_rag_agent = initialize_agent(
    tools = tools,
    llm = rag_llm,
    agent = AgentType.CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
    memory = global_memory,
    verbose = False
)

In [None]:
# create agent utilizing function for gradio
def domain_rag_agent_fn(user_input, chat_history):
    response = domain_rag_agent.run(input=user_input)
    chat_history.append((user_input, response))
    return chat_history, chat_history, ""

In [None]:
# gradio user interface
with gr.Blocks() as demo:

  gr.Markdown('Cancer-QA BOT')

  chatbot = gr.Chatbot(type='messages')

  user_box = gr.Textbox(
      show_label = False,
      placeholder = 'Ask any question about cancer'
  )

  clear_btn = gr.Button('Clear Chat')

  user_box.submit(
      fn = domain_rag_agent_fn,
      inputs = [user_box, chatbot],
      outputs = [chatbot, chatbot, user_box]
  )

  def clear_memory():
    global_memory.clear()
    return []

  clear_btn.click(clear_memory, [], chatbot, queue=False)

demo.launch(debug=True)