# 1. Introduction to RAG with Langchain

In [1]:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI


# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

## 1.1. Process the document and build the vector store using ChromaDB

In [2]:
from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

In [8]:
#%pip install --upgrade --quiet playwright beautifulsoup4 html2text
#!playwright install

Note: you may need to restart the kernel to use updated packages.
Downloading Chromium 128.0.6613.18 (playwright build v1129)[2m from https://playwright.azureedge.net/builds/chromium/1129/chromium-mac-arm64.zip[22m
Chromium 128.0.6613.18 (playwright build v1129) downloaded to /Users/daisy/Library/Caches/ms-playwright/chromium-1129
Downloading FFMPEG playwright build v1009[2m from https://playwright.azureedge.net/builds/ffmpeg/1009/ffmpeg-mac-arm64.zip[22m
FFMPEG playwright build v1009 downloaded to /Users/daisy/Library/Caches/ms-playwright/ffmpeg-1009
Downloading Firefox 128.0 (playwright build v1458)[2m from https://playwright.azureedge.net/builds/firefox/1458/firefox-mac-arm64.zip[22m
Firefox 128.0 (playwright build v1458) downloaded to /Users/daisy/Library/Caches/ms-playwright/firefox-1458
Downloading Webkit 18.0 (playwright build v2051)[2m from https://playwright.azureedge.net/builds/webkit/2051/webkit-mac-14-arm64.zip[22m
Webkit 18.0 (playwright build v2051) downloaded to 

In [9]:
#!pip install nest-asyncio
import nest_asyncio

nest_asyncio.apply()



In [10]:
from langchain_community.document_loaders import AsyncChromiumLoader

urls = ["https://simple.wikipedia.org/wiki/Earthquake"]
loader = AsyncChromiumLoader(urls)
docs = loader.load()
docs[0].page_content[0:100]

USER_AGENT environment variable not set, consider setting it to identify your requests.


'<!DOCTYPE html><html class="client-js vector-feature-language-in-header-enabled vector-feature-langu'

In [11]:
from langchain_community.document_transformers import Html2TextTransformer

html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(docs)
docs_transformed[0].page_content[0:500]

'Jump to content\n\nMain menu\n\nMain menu\n\nmove to sidebar hide\n\nGetting around\n\n  * Main page\n  * Simple start\n  * Simple talk\n  * New changes\n  * Show any page\n  * Help\n  * Contact us\n  * Give to Wikipedia\n  * About Wikipedia\n\nSearch\n\nSearch\n\nAppearance\n\n  * Create account\n  * Log in\n\nPersonal tools\n\n  * Create account\n  * Log in\n\nPages for logged out editors learn more\n\n  * Contributions\n  * Talk\n\n## Contents\n\nmove to sidebar hide\n\n  * Beginning\n\n  * 1 Zones\n\n  * 2 History\n\n  * 3 Causes of an ear'

In [12]:
# Split text into chunks

text_splitter  = RecursiveCharacterTextSplitter(chunk_size=500,chunk_overlap=20)
text_chunks = text_splitter.split_documents(docs_transformed)

vectorstore = Chroma.from_documents(documents=text_chunks, 
                                    embedding=OpenAIEmbeddings(),
                                    persist_directory="data/science")
vectorstore.persist()

  warn_deprecated(


In [13]:
vectorstore.similarity_search("What is earthquake?")

[Document(metadata={'source': 'https://simple.wikipedia.org/wiki/Earthquake'}, page_content="An **earthquake** is when Earth's tectonic plates shake and move Earth's\nsurface. Strong earthquakes damage buildings."),
 Document(metadata={'source': 'https://simple.wikipedia.org/wiki/Earthquake'}, page_content='Retrieved from\n"https://simple.wikipedia.org/w/index.php?title=Earthquake&oldid=9521108"\n\nCategories:\n\n  * Earthquakes\n  * Plate tectonics\n\nHidden categories:\n\n  * Commons category link is on Wikidata\n  * Webarchive template wayback links\n\n  * This page was last changed on 3 May 2024, at 08:24.\n  * Text is available under the Creative Commons Attribution-ShareAlike License and the GFDL; additional terms may apply. See Terms of Use for details.'),
 Document(metadata={'source': 'https://simple.wikipedia.org/wiki/Earthquake'}, page_content="Disturbances in the Earth cause earthquakes. Different tectonic plates are\nslowly moving. When they get stuck, tension builds up in 

## 1.2. Retriever initialization

In [15]:
retriever = vectorstore.as_retriever(search_kwargs={'k':5})

In [7]:
#pip install langchainhub

Collecting langchainhub
  Downloading langchainhub-0.1.20-py3-none-any.whl.metadata (659 bytes)
Collecting types-requests<3.0.0.0,>=2.31.0.2 (from langchainhub)
  Downloading types_requests-2.32.0.20240712-py3-none-any.whl.metadata (1.9 kB)
Downloading langchainhub-0.1.20-py3-none-any.whl (5.0 kB)
Downloading types_requests-2.32.0.20240712-py3-none-any.whl (15 kB)
Installing collected packages: types-requests, langchainhub
Successfully installed langchainhub-0.1.20 types-requests-2.32.0.20240712
Note: you may need to restart the kernel to use updated packages.


In [16]:
prompt = hub.pull('rlm/rag-prompt')
prompt

ChatPromptTemplate(input_variables=['context', 'question'], metadata={'lc_hub_owner': 'rlm', 'lc_hub_repo': 'rag-prompt', 'lc_hub_commit_hash': '50442af133e61576e74536c6556cefe1fac147cad032f4377b60c436e6cdcb6e'}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['context', 'question'], template="You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.\nQuestion: {question} \nContext: {context} \nAnswer:"))])

## 1.3. Creating the QA chain

In [17]:
# LLM
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

# Post-processing
def format_docs(docs_transformed):
    return "\n\n".join(doc.page_content for doc in docs_transformed)

# Chain using LangChain Expression Language
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

In [18]:
rag_chain.invoke("What is an earthquake?")

"An earthquake is a sudden shaking of the Earth's surface caused by the movement of tectonic plates. This movement can lead to the release of built-up tension, resulting in damage to buildings and other structures. The point where the earthquake originates underground is called the hypocenter, while the point directly above it on the surface is known as the epicenter."

# 2. Query Transformation

The main idea behind the Query Transformation is that translate/transform the user query in a way that the LLM can correctly answer the question. For instance, if the user asks an ambiguous question, our RAG retriever might retrieve incorrect (or ambiguous) documents based on the embeddings that are not very relevant to answer the user question, leading the LLM to hallucinate answers.

In [1]:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI


# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain import hub
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.load import loads, dumps
from typing import List

In [3]:
import nest_asyncio

nest_asyncio.apply()

In [5]:
from langchain_community.document_loaders import AsyncChromiumLoader

urls = ["https://simple.wikipedia.org/wiki/Earthquake"]
loader = AsyncChromiumLoader(urls)
docs = loader.load()
docs[0].page_content[0:100]

USER_AGENT environment variable not set, consider setting it to identify your requests.


'<!DOCTYPE html><html class="client-js vector-feature-language-in-header-enabled vector-feature-langu'

In [6]:
from langchain_community.document_transformers import Html2TextTransformer

html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(docs)
docs_transformed[0].page_content[0:500]

'Jump to content\n\nMain menu\n\nMain menu\n\nmove to sidebar hide\n\nGetting around\n\n  * Main page\n  * Simple start\n  * Simple talk\n  * New changes\n  * Show any page\n  * Help\n  * Contact us\n  * Give to Wikipedia\n  * About Wikipedia\n\nSearch\n\nSearch\n\nAppearance\n\n  * Create account\n  * Log in\n\nPersonal tools\n\n  * Create account\n  * Log in\n\nPages for logged out editors learn more\n\n  * Contributions\n  * Talk\n\n## Contents\n\nmove to sidebar hide\n\n  * Beginning\n\n  * 1 Zones\n\n  * 2 History\n\n  * 3 Causes of an ear'

In [11]:
# Split text into chunks
text_splitter  = RecursiveCharacterTextSplitter(chunk_size=500,chunk_overlap=20)
text_chunks = text_splitter.split_documents(docs_transformed)

vectorstore = Chroma.from_documents(documents=text_chunks, 
                                    embedding=OpenAIEmbeddings(),
                                    persist_directory="data/science")
vectorstore.persist()

In [12]:
retriever = vectorstore.as_retriever(search_kwargs={'k':5})

## 2.1. Query Translation

### 2.1.1. Multi-Query

In [27]:
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template(
    """
    You are an intelligent assistant. Your task is to generate 5 questions based on the provided question in different wording and different perspectives to retrieve relevant documents from a vector database. By generating multiple perspectives on the user question, your goal is to help the user overcome some of the limitations of the distance-based similarity search. Provide these alternative questions separated by newlines. Original question: {question}
    """
)

generate_queries = (
    {"question": RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0.7)
    | StrOutputParser()
    | (lambda x: x.split("\n"))
)

In [28]:
generate_queries.invoke("What are the causes of an earthquake?")

['1. What factors contribute to the occurrence of earthquakes?',
 '2. What geological processes lead to the formation of earthquakes?',
 '3. Can you explain the main reasons behind the triggering of earthquakes?',
 '4. What are some natural phenomena that can result in an earthquake?',
 '5. How do tectonic plate movements influence the likelihood of earthquakes?']

In [15]:
def get_context_union(docs: List[List]):
    all_docs = [dumps(d) for doc in docs for d in doc]
    unique_docs = list(set(all_docs))
    
    return [loads(doc).page_content for doc in unique_docs] # We only return page contents


retrieval_chain = (
    {'question': RunnablePassthrough()}
    | generate_queries
    | retriever.map()
    | get_context_union
)

In [29]:
retrieval_chain.invoke("What are the causes of an earthquake?")

 "Disturbances in the Earth cause earthquakes. Different tectonic plates are\nslowly moving. When they get stuck, tension builds up in them. Earthquakes\noccur when tectonic plates suddenly break free, so they start moving quickly.\nThe first point of an earthquake's rupture is called its hypocenter or focus.\nThe epicenter is the point at ground level directly above the hypocenter.",
 '1. ↑ "Earthquake Zones | Physical Geography". _courses.lumenlearning.com_. Archived from the original on 2021-07-09. Retrieved 2021-07-09.\n  2. ↑ "Science of Earthquakes Seismic Zones". _PR Fire_. Archived from the original on 2021-07-09. Retrieved 2021-07-09.\n  3. ↑ Jump up to: 3.0 3.1 "What are aftershocks, foreshocks, and earthquake clusters?". Archived from the original on 2009-05-11. Retrieved 2017-08-31.',
 'Most earthquakes form part of a sequence, related to each other in terms of\nlocation and time.[3] Most earthquake clusters consist of small tremors which\ncause little to no damage, but the

In [30]:
prompt = ChatPromptTemplate.from_template(
    """
    Asnwer the given question using the provided context.\n\nContext: {context}\n\nQuestion: {question}
    """
)

multi_query_chain = (
    {'context': retrieval_chain, 'question': RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0)
    | StrOutputParser()
)

In [31]:
multi_query_chain.invoke("What are the causes of an earthquake?")

'The causes of an earthquake primarily involve disturbances in the Earth due to the movement of tectonic plates. These plates are slowly moving, and when they get stuck, tension builds up in them. Earthquakes occur when these tectonic plates suddenly break free and start moving quickly. Additionally, earthquakes can be caused by geological faults, which can be classified into three main types: normal faults (where the crust is being extended), reverse (thrust) faults (where the crust is being shortened), and strike-slip faults (where the two sides of the fault slip horizontally past each other). In some cases, the specific causes of earthquakes may not be known in detail, such as in the case of the 1811–1812 New Madrid earthquakes, which were attributed to deep shifts in ancient rock formations. Minor tremors can also occur due to collapse earthquakes, often related to intense mining activity.'

### 2.1.2. RAG Fusion

In [21]:
def rrf(results: List[List], k=60):
    # Initialize a dictionary to hold fused scores for each unique document
    fused_scores = {}

    # Iterate through each list of ranked documents
    for docs in results:
        # Iterate through each document in the list, with its rank (position in the list)
        for rank, doc in enumerate(docs):
            # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
            doc_str = dumps(doc)
            # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            # Retrieve the current score of the document, if any
            previous_score = fused_scores[doc_str]
            # Update the score of the document using the RRF formula: 1 / (rank + k)
            fused_scores[doc_str] += 1 / (rank + k)

    # Sort the documents based on their fused scores in descending order to get the final reranked results
    reranked_results = [
        (loads(doc), score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

    # Return the reranked results as a list of tuples, each containing the document and its fused score
    return reranked_results

In [22]:
from langchain.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template(
    """
    You are an intelligent assistant. Your task is to generate 4 questions based on the provided question in different wording and different perspectives to retrieve relevant documents from a vector database. By generating multiple perspectives on the user question, your goal is to help the user overcome some of the limitations of the distance-based similarity search. Provide these alternative questions separated by newlines. Original question: {question}
    """
)

generate_queries = (
    {"question": RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0.7)
    | StrOutputParser()
    | (lambda x: x.split("\n"))
)


fusion_retrieval_chain = (
    {'question': RunnablePassthrough()}
    | generate_queries
    | retriever.map()
    | rrf
)

In [32]:
fusion_retrieval_chain.invoke("What are the causes of an earthquake?")

[(Document(metadata={'source': 'https://simple.wikipedia.org/wiki/Earthquake'}, page_content="Disturbances in the Earth cause earthquakes. Different tectonic plates are\nslowly moving. When they get stuck, tension builds up in them. Earthquakes\noccur when tectonic plates suddenly break free, so they start moving quickly.\nThe first point of an earthquake's rupture is called its hypocenter or focus.\nThe epicenter is the point at ground level directly above the hypocenter."),
  0.09918032786885246),
 (Document(metadata={'source': 'https://simple.wikipedia.org/wiki/Earthquake'}, page_content='1. ↑ "Earthquake Zones | Physical Geography". _courses.lumenlearning.com_. Archived from the original on 2021-07-09. Retrieved 2021-07-09.\n  2. ↑ "Science of Earthquakes Seismic Zones". _PR Fire_. Archived from the original on 2021-07-09. Retrieved 2021-07-09.\n  3. ↑ Jump up to: 3.0 3.1 "What are aftershocks, foreshocks, and earthquake clusters?". Archived from the original on 2009-05-11. Retriev

In [24]:
def format_context(documents: List):
    return "\n\n".join([doc[0].page_content for doc in documents])


prompt = ChatPromptTemplate.from_template(
    """
    Asnwer the given question using the provided context.\n\nContext: {context}\n\nQuestion: {question}
    """
)

multi_query_chain = (
    {'context': fusion_retrieval_chain | format_context, 'question': RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0)
    | StrOutputParser()
)

In [33]:
multi_query_chain.invoke("What are the causes of an earthquake?")

"The causes of an earthquake primarily involve disturbances in the Earth's crust due to the movement of tectonic plates. When these plates get stuck, tension builds up, and earthquakes occur when they suddenly break free and start moving quickly. The main causes include:\n\n1. Tectonic movements in the Earth's crust, particularly when plates ride over one another, leading to orogeny (mountain building) and severe earthquakes.\n2. Geological faults, which can be classified into three main types: normal faults (where the crust is being extended), reverse (thrust) faults (where the crust is being shortened), and strike-slip faults (where the two sides of the fault slip horizontally past each other).\n3. In some cases, the causes of earthquakes are not well understood, such as the New Madrid earthquakes, which were attributed to deep shifts in ancient rock formations.\n4. Collapse earthquakes can occur in areas of intense mining activity, where the roofs of underground mines collapse, caus

## 2.2. Query Decomposition

### 2.2.1. Least-to-Most Prompting

In [34]:
from langchain.prompts import ChatPromptTemplate

decompostion_prompt = ChatPromptTemplate.from_template(
    """
    You are a helpful assistant that can break down complex questions into simpler parts. \n
        Your goal is to decompose the given question into multiple sub-questions that can be answerd in isolation to answer the main question in the end. \n
        Provide these sub-questions separated by the newline character. \n
        Original question: {question}\n
        Output (3 queries): 
    """
)

query_generation_chain = (
    {"question": RunnablePassthrough()}
    | decompostion_prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0.7)
    | StrOutputParser()
    | (lambda x: x.split("\n"))
)

In [35]:
questions = query_generation_chain.invoke("What are the causes of an earthquake?")
questions

['What are the different types of tectonic plate movements that can lead to earthquakes?',
 '',
 'What role do geological faults play in the occurrence of earthquakes?',
 '',
 'How does human activity contribute to the occurrence of earthquakes?']

In [36]:
from operator import itemgetter


# Create the final prompt template to answer the question with provided context and background Q&A pairs
template = """Here is the question you need to answer:

\n --- \n {question} \n --- \n

Here is any available background question + answer pairs:

\n --- \n {q_a_pairs} \n --- \n

Here is additional context relevant to the question: 

\n --- \n {context} \n --- \n

Use the above context and any background question + answer pairs to answer the question: \n {question}
"""

least_to_most_prompt = ChatPromptTemplate.from_template(template) 
llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)

least_to_most_chain = (
        {'context': itemgetter('question') | retriever,
        'q_a_pairs': itemgetter('q_a_pairs'),
        'question': itemgetter('question'),
        }
        | least_to_most_prompt
        | llm
        | StrOutputParser()
    )

q_a_pairs = ""
for q in questions:
    
    answer = least_to_most_chain.invoke({"question": q, "q_a_pairs": q_a_pairs})
    q_a_pairs+=f"Question: {q}\n\nAnswer: {answer}\n\n"

In [37]:
least_to_most_chain.invoke({"question": "What are the causes of an earthquake?", "q_a_pairs": q_a_pairs})

"Earthquakes are primarily caused by the movement of tectonic plates in the Earth's crust. Here are the main causes:\n\n1. **Tectonic Plate Movements**: The Earth's crust is divided into several large and small tectonic plates that are constantly moving, albeit very slowly. When these plates interact, they can become stuck at their edges due to friction. As the plates continue to move, tension builds up in the rocks along geological faults. When the stress exceeds the strength of the rocks, it results in a sudden release of energy, causing the rocks to break and slip along the fault line, which generates seismic waves that we perceive as an earthquake.\n\n2. **Types of Faults**: The movement of tectonic plates can lead to different types of faults, which are critical in the occurrence of earthquakes:\n   - **Normal Faults**: Occur in regions where the crust is being extended, causing one block of crust to move downward relative to another.\n   - **Reverse (Thrust) Faults**: Occur in ar

In [38]:
prompt = hub.pull('rlm/rag-prompt')
prompt

ChatPromptTemplate(input_variables=['context', 'question'], metadata={'lc_hub_owner': 'rlm', 'lc_hub_repo': 'rag-prompt', 'lc_hub_commit_hash': '50442af133e61576e74536c6556cefe1fac147cad032f4377b60c436e6cdcb6e'}, messages=[HumanMessagePromptTemplate(prompt=PromptTemplate(input_variables=['context', 'question'], template="You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise.\nQuestion: {question} \nContext: {context} \nAnswer:"))])

In [39]:
def generate_and_answer(question):
    
    questions = []
    
    sub_questions = query_generation_chain.invoke(question)
    
    sub_qa_chain = (
        {'context': RunnablePassthrough() | retriever, 'question': RunnablePassthrough()}
        | prompt
        | ChatOpenAI(model='gpt-4o-mini', temperature=0)
        | StrOutputParser()
    )
    
    for q in sub_questions:
        answer = sub_qa_chain.invoke(q)
        questions.append({"question": q, "answer": answer})
        
    return questions
        
qa_pairs = generate_and_answer("What are the causes of an earthquake?")

In [40]:
def format_qa_pairs(qa_pairs):
    
    formatted_string = ""
    
    for i, qa in enumerate(qa_pairs):
        formatted_string += f"Question {i}: {qa['question']}\nAnswer {i}: {qa['answer']}\n\n"
    return formatted_string.strip()

context = format_qa_pairs(qa_pairs)

# Prompt

prompt = ChatPromptTemplate.from_template(
    """
    Consider the following Question and Answer Pairs:

    {context}

    Use these to synthesize an answer to the question: {question}
    """
)

final_rag_chain = (
     prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0)
    | StrOutputParser()
)

final_rag_chain.invoke({'context': context, 'question': "What are the causes of an earthquake?"})

'Earthquakes are primarily caused by geological processes related to the movement of tectonic plates. When these plates interact, they can become stuck due to friction, causing tension to build up over time. Eventually, this tension is released suddenly, resulting in an earthquake. The main types of geological faults involved in this process include normal, reverse (thrust), and strike-slip faults.\n\nIn addition to natural tectonic activity, human activities can also contribute to the occurrence of earthquakes. Activities such as mining, the construction of large dams (which can lead to reservoir-induced seismicity), and hydraulic fracturing (fracking) can induce seismic events. While these human-induced earthquakes are generally less powerful than those caused by tectonic movements, they can still have significant impacts.'

### 2.2.2. Step back prompting

In [41]:
from langchain_core.prompts import ChatPromptTemplate, FewShotChatMessagePromptTemplate

examples = [
    {
        'input': 'What happens to the pressure, P, of an ideal gas if the temperature is increased by a factor of 2 and the volume is increased by a factor of 8?',
        'output': 'What are the physics principles behind this question?'
    },
    {
        'input': 'Estella Leopold went to which school between Aug 1954 and Nov 1954?',
        'output': "What was Estella Leopold's education history?"
    }
]
example_prompt = ChatPromptTemplate.from_messages(
            [
                ('human', '{input}'), ('ai', '{output}')
            ]
        )
few_shot_prompt = FewShotChatMessagePromptTemplate(
    examples=examples,
            # This is a prompt template used to format each individual example.
    example_prompt=example_prompt,
)

final_prompt = ChatPromptTemplate.from_messages(
            [
                ('system', """You are an expert at science knowledge. Your task is to step back and paraphrase a question to a more generic step-back question, which is easier to answer. Here are a few examples:"""),
                few_shot_prompt,
                ('user', '{question}'),
            ]
        )

final_prompt.format(question= "What are the causes of an earthquake?")

"System: You are an expert at science knowledge. Your task is to step back and paraphrase a question to a more generic step-back question, which is easier to answer. Here are a few examples:\nHuman: What happens to the pressure, P, of an ideal gas if the temperature is increased by a factor of 2 and the volume is increased by a factor of 8?\nAI: What are the physics principles behind this question?\nHuman: Estella Leopold went to which school between Aug 1954 and Nov 1954?\nAI: What was Estella Leopold's education history?\nHuman: What are the causes of an earthquake?"

In [42]:
step_back_query_chain = (
    {'question': RunnablePassthrough()}
    | final_prompt 
    | ChatOpenAI(model='gpt-4o-mini', temperature=0.7) 
    | StrOutputParser()
    )

step_back_query_chain.invoke("What are the causes of an earthquake?")

'What are the fundamental factors that lead to geological events like earthquakes?'

In [43]:
response_prompt_template = """You are an expert of science knowledge. 
I am going to ask you a question. Your response should be comprehensive and not contradicted with the following context if they are relevant. 
Otherwise, ignore them if they are not relevant.

# {normal_context}
# {step_back_context}

# Original Question: {question}
# Answer:"""
response_prompt = ChatPromptTemplate.from_template(response_prompt_template)

step_back_chain = (
    {'normal_context': RunnablePassthrough() | retriever,
     'step_back_context': RunnablePassthrough() | step_back_query_chain | retriever,
     'question': RunnablePassthrough()
     }
    | response_prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0)
    | StrOutputParser()
)

In [44]:
step_back_chain.invoke("What are the causes of an earthquake?")

"Earthquakes are primarily caused by disturbances in the Earth's crust due to the movement of tectonic plates. These plates are constantly in motion, but they can become stuck at their edges due to friction. As they continue to move, tension builds up in the rocks along these faults. When the stress exceeds the strength of the rocks, the plates suddenly break free, resulting in an earthquake.\n\nThe point where the earthquake rupture begins is known as the hypocenter or focus, while the point directly above it on the Earth's surface is called the epicenter.\n\nThere are also other causes of earthquakes, although they are less common. For instance, the largest earthquake in U.S. history, the 1811–1812 New Madrid earthquakes, was attributed to deep shifts in ancient rock formations, though the specific details of the cause remain unclear. Additionally, in areas with intense mining activity, the collapse of underground mine roofs can lead to minor tremors known as collapse earthquakes.\n\

# 3. HyDE (Hypothetical Document Embeddings)

Instead of generating queries based on the original question, HyDE focuses on generating hypothetical docuemnts for a given query. The intution behind generating such hypothetical documents is their embedding vectors can be used to identify a neighborhood in the corpus embedding space where similar real documents are retrieved based on vector similarity. In that case, RAG will be able to retrieve more relevant documents based on the hypothetical documents to answer the user query accurately.

In [45]:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI


# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

In [46]:
from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

In [47]:
import nest_asyncio

nest_asyncio.apply()

In [48]:
from langchain_community.document_loaders import AsyncChromiumLoader

urls = ["https://simple.wikipedia.org/wiki/Earthquake"]
loader = AsyncChromiumLoader(urls)
docs = loader.load()

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [49]:
from langchain_community.document_transformers import Html2TextTransformer

html2text = Html2TextTransformer()
docs_transformed = html2text.transform_documents(docs)

In [50]:
# Split text into chunks
text_splitter  = RecursiveCharacterTextSplitter(chunk_size=500,chunk_overlap=20)
text_chunks = text_splitter.split_documents(docs_transformed)

vectorstore = Chroma.from_documents(documents=text_chunks, 
                                    embedding=OpenAIEmbeddings(),
                                    persist_directory="data/science")
vectorstore.persist()

retriever = vectorstore.as_retriever(search_kwargs={'k':5})

In [51]:
from langchain.prompts import ChatPromptTemplate

hyde_prompt = ChatPromptTemplate.from_template(
    """
    Please write a short blog to answer the following question:\n
    Question: {question}\n
    Passage: 
    """
)

generate_doc_chain = (
    {'question': RunnablePassthrough()}
    | hyde_prompt
    | ChatOpenAI(model='gpt-4o-mini',temperature=0)
    | StrOutputParser()
)

question = "What are the causes of an earthquake?"
generate_doc_chain.invoke(question)



In [52]:
retrieval_chain = generate_doc_chain | retriever 
retireved_docs = retrieval_chain.invoke({"question":question})
retireved_docs

[Document(metadata={'source': 'https://simple.wikipedia.org/wiki/Earthquake'}, page_content="Disturbances in the Earth cause earthquakes. Different tectonic plates are\nslowly moving. When they get stuck, tension builds up in them. Earthquakes\noccur when tectonic plates suddenly break free, so they start moving quickly.\nThe first point of an earthquake's rupture is called its hypocenter or focus.\nThe epicenter is the point at ground level directly above the hypocenter."),
 Document(metadata={'source': 'https://simple.wikipedia.org/wiki/Earthquake'}, page_content="Disturbances in the Earth cause earthquakes. Different tectonic plates are\nslowly moving. When they get stuck, tension builds up in them. Earthquakes\noccur when tectonic plates suddenly break free, so they start moving quickly.\nThe first point of an earthquake's rupture is called its hypocenter or focus.\nThe epicenter is the point at ground level directly above the hypocenter."),
 Document(metadata={'source': 'https://s

In [53]:
template = """Answer the following question based on the provided context:

{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

final_rag_chain = (
    prompt
    | ChatOpenAI(model='gpt-4o-mini',temperature=0)
    | StrOutputParser()
)

final_rag_chain.invoke({"context":retireved_docs,"question":question})

'Earthquakes are caused by disturbances in the Earth, primarily due to the movement of tectonic plates. These plates are slowly moving, and when they get stuck, tension builds up in them. Earthquakes occur when these tectonic plates suddenly break free and start moving quickly. Additionally, earthquakes can be caused by tectonic plates riding over one another, which can lead to orogeny (mountain building) and severe earthquakes.'

# 4. Routing


When we have multiple data sources such as a GraphDB, PDF documents (i.e., a vector store), we might need to answer user queries based on the correct data source. For example, if the user wants to know about reviews of a hospital, user query should be redirected to the vector store containing embeddings of hospital reviews. On the other hand, if the user wants to know about information such as the doctors, patients, their visits to the hospital, the user query should probably be send to a graph database that contains the hospial information. Therefore, to provide such as functionality we will now focus on “Routing” in RAG with LangChain.

In [54]:
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI


# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

In [55]:
from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

In [56]:
import nest_asyncio

nest_asyncio.apply()

In [57]:
from langchain_community.document_loaders import AsyncChromiumLoader
from langchain_community.document_transformers import Html2TextTransformer

In [75]:
def generate_vectorstores(url, dir):
    urls = [url]
    loader = AsyncChromiumLoader(urls)
    docs = loader.load()
    html2text = Html2TextTransformer()
    docs_transformed = html2text.transform_documents(docs)

    # Split text into chunks

    text_splitter  = RecursiveCharacterTextSplitter(chunk_size=500,chunk_overlap=20)
    text_chunks = text_splitter.split_documents(docs_transformed)

    vectorstore = Chroma.from_documents(documents=text_chunks, 
                                        embedding=OpenAIEmbeddings(),
                                        persist_directory=dir)
    vectorstore.persist()
    return vectorstore


# Create a vectorstore to answer questions about Earthquake
vectorstore_earthquake = generate_vectorstores("https://simple.wikipedia.org/wiki/Earthquake", "data/science_earthquake")

# Create a vectorstore to answer questions about Thunderstorm
vectorstore_thunderstorm = generate_vectorstores("https://en.wikipedia.org/wiki/Thunderstorm", "data/science_thunderstorm")

USER_AGENT environment variable not set, consider setting it to identify your requests.
USER_AGENT environment variable not set, consider setting it to identify your requests.


In [76]:
retriever_earthquake = vectorstore_earthquake.as_retriever(search_kwargs={'k':5})
retriever_thunderstorm = vectorstore_thunderstorm.as_retriever(search_kwargs={'k':5})

## 4.1. Logical Routing

In [77]:
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from typing import Literal

class QueryRouter(BaseModel):
    
    """Route a user query to the appropriate datasource that will help answer the query accurately"""
    
    datasource: Literal['earthquake', 'thunderstorm', 'general'] = Field(..., 
                                                description="Given a user question choose which datasource would be most relevant for answering their question"
                                                )
    question: str = Field(..., description="User question to be routed to the appropriate datasource")
    
llm = ChatOpenAI(model='gpt-4o-mini',temperature=0)
structured_llm = llm.with_structured_output(QueryRouter)

router_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are an expert router that can direct user queries to the appropriate datasource. Route the following user question about a topic in earthquake and thunderstorm to the appropriate datasource.\nIf it is a general question not related to the provided datasources, route it to the general datasource.\n"),
        ("user", "{question}")
    ]
)

router = (
    {'question': RunnablePassthrough()}
    | router_prompt
    | structured_llm
)

In [80]:
question = "Why does the thunderstorm happen?"
result = router.invoke(question)
result

QueryRouter(datasource='thunderstorm', question='Why does the thunderstorm happen?')

In [81]:
QueryRouter(datasource='thunderstorm', question='Why does the thunderstorm happen?')

QueryRouter(datasource='thunderstorm', question='Why does the thunderstorm happen?')

In [82]:
qa_prompt = hub.pull('rlm/rag-prompt')


def choose_route(result):
    
    llm_route = ChatOpenAI(model='gpt-4o-mini',temperature=0)
    if "thunderstorm" in result.datasource.lower():
        print(f"> Asking about thunderstorm ...\nQuestion: {result.question}\nAnswer:")
        thunderstorm_chain = (
            {'context': retriever_thunderstorm, 'question': RunnablePassthrough()}
            | qa_prompt
            | llm_route
            | StrOutputParser()
        )
        return thunderstorm_chain.invoke(result.question)
    elif "earthquake" in result.datasource.lower():
        print(f"> Asking about earthquake ...\nQuestion: {result.question}\nAnswer:")
        earthquake_chain = (
            {'context': retriever_earthquake, 'question': RunnablePassthrough()}
            | qa_prompt
            | llm_route
            | StrOutputParser()
        )
        return earthquake_chain.invoke(result.question)
    else:
        print(f"> Asking about a general question ...\nQuestion: {result.question}\nAnswer:")
        general_chain = llm_route | StrOutputParser()
        return general_chain.invoke(result.question)

from langchain_core.runnables import RunnableLambda

full_chain = router | RunnableLambda(choose_route)

In [83]:
full_chain.invoke("How to proof earthquake?")

> Asking about earthquake ...
Question: How to proof earthquake?
Answer:


"To prove an earthquake, one can measure its effects using a seismometer, which detects and records the tremors on a seismograph. The strength or magnitude of the earthquake can be quantified using the Richter scale. Additionally, studying the tectonic movements and the resulting energy waves can provide evidence of an earthquake's occurrence."

## 4.2. Semantic Rounting

In [84]:
physics_template = """You are a very smart physics professor. \
You are great at answering questions about physics in a concise and easy to understand manner. \
When you don't know the answer to a question you admit that you don't know.

Here is a question:
{question}"""

math_template = """You are a very good mathematician. You are great at answering math questions. \
You are so good because you are able to break down hard problems into their component parts, \
answer the component parts, and then put them together to answer the broader question.

Here is a question:
{question}"""

In [85]:
embeddings = OpenAIEmbeddings()
routes = [physics_template, math_template]
route_embeddings = embeddings.embed_documents(routes)
len(route_embeddings)

2

In [86]:
from langchain.utils.math import cosine_similarity
from langchain.prompts import PromptTemplate

def router(input):
    # Generate embeddings for the user query
    query_embedding = embeddings.embed_query(input['question'])
    # Getting similarity scores between the user query and the routes. This contains the similarity scores between the user query and each of the two routes.
    similarity = cosine_similarity([query_embedding], route_embeddings)[0]
    # Find the route that gives the maximum similarity score
    route_id = similarity.argmax()
    if route_id == 0:
        print(f"> Asking a physics question ...\nQuestion: {input['question']}\nAnswer:")
    else:
        print(f"> Asking a math question ...\nQuestion: {input['question']}\nAnswer:")
        
    return PromptTemplate.from_template(routes[route_id])

In [87]:
semantic_router_chain = (
    {'question': RunnablePassthrough()}
    | RunnableLambda(router)
    | ChatOpenAI(model='gpt-4o-mini',temperature=0)
    | StrOutputParser()
)

semantic_router_chain.invoke("What is the formula for the area of a circle?")

> Asking a math question ...
Question: What is the formula for the area of a circle?
Answer:


'The formula for the area of a circle is given by:\n\n\\[\nA = \\pi r^2\n\\]\n\nwhere:\n- \\( A \\) is the area of the circle,\n- \\( r \\) is the radius of the circle, and\n- \\( \\pi \\) (pi) is a mathematical constant approximately equal to 3.14159.\n\nTo break it down:\n1. The radius \\( r \\) is the distance from the center of the circle to any point on its circumference.\n2. The area \\( A \\) represents the total space enclosed within the circle.\n\nSo, to find the area, you square the radius (multiply it by itself) and then multiply that result by \\( \\pi \\).'

# 5. Query Construction

After the user asks a question in natural language and routed to a specific datasource (e.g., vector store, Graph database, etc,), the question should be transformed into a structured query to retrieve information from the selected datasource (e.g., Text-to-SQL, Text-to-Cypher, etc.).

In [1]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

In [2]:
from langchain_community.document_loaders import YoutubeLoader
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough

In [5]:
#pip install youtube-transcript-api

Collecting youtube-transcript-api
  Downloading youtube_transcript_api-0.6.2-py3-none-any.whl.metadata (15 kB)
Downloading youtube_transcript_api-0.6.2-py3-none-any.whl (24 kB)
Installing collected packages: youtube-transcript-api
Successfully installed youtube-transcript-api-0.6.2
Note: you may need to restart the kernel to use updated packages.


In [4]:
#pip install --quiet pytube

Note: you may need to restart the kernel to use updated packages.


In [3]:
docs = YoutubeLoader.from_youtube_url(
    "https://www.youtube.com/watch?v=skPtWocTKdU", add_video_info=True
).load()

docs[0].metadata

{'source': 'skPtWocTKdU',
 'title': 'Immune Response to Bacteria',
 'description': 'Unknown',
 'view_count': 223725,
 'thumbnail_url': 'https://i.ytimg.com/vi/skPtWocTKdU/hqdefault.jpg?sqp=-oaymwEXCJADEOABSFryq4qpAwkIARUAAIhCGAE=&rs=AOn4CLBzYziRt2l-7nYT-3QdarBKhiyNcg',
 'publish_date': '2012-01-05 00:00:00',
 'length': 107,
 'author': 'NIAID'}

In [4]:
import datetime
from typing import Optional
from langchain_core.pydantic_v1 import BaseModel, Field

class TutorialSearch(BaseModel):
    """Search over a database/playlist of videos about immune and bacteria."""

    content_search: str = Field(
        ...,
        description="Similarity search query applied to video transcripts.",
    )
    title_search: str = Field(
        ...,
        description=(
            "Alternate version of the content search query to apply to video titles. "
            "Should be succinct and only include key words that could be in a video "
            "title."
        ),
    )
    min_view_count: Optional[int] = Field(
        None,
        description="Minimum view count filter, inclusive. Only use if explicitly specified.",
    )
    max_view_count: Optional[int] = Field(
        None,
        description="Maximum view count filter, exclusive. Only use if explicitly specified.",
    )
    earliest_publish_date: Optional[datetime.date] = Field(
        None,
        description="Earliest publish date filter, inclusive. Only use if explicitly specified.",
    )
    latest_publish_date: Optional[datetime.date] = Field(
        None,
        description="Latest publish date filter, exclusive. Only use if explicitly specified.",
    )
    min_length_sec: Optional[int] = Field(
        None,
        description="Minimum video length in seconds, inclusive. Only use if explicitly specified.",
    )
    max_length_sec: Optional[int] = Field(
        None,
        description="Maximum video length in seconds, exclusive. Only use if explicitly specified.",
    )

    def pretty_print(self) -> None:
        for field in self.__fields__:
            if getattr(self, field) is not None and getattr(self, field) != getattr(
                self.__fields__[field], "default", None
            ):
                print(f"{field}: {getattr(self, field)}")

In [5]:
meta_data_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are an expert at converting user questions into database queries. \
                    You have access to a database of tutorial videos about immune and bacteria. \
                    Given a question, return a database query optimized to retrieve the most relevant results."
        ),
        ("user", "{question}")
    ]
)

llm = ChatOpenAI(model='gpt-4o-mini',temperature=0)
structured_llm = llm.with_structured_output(TutorialSearch)

meta_data_chain = (
    {'question': RunnablePassthrough()}
    | meta_data_prompt
    | structured_llm
)

query = meta_data_chain.invoke("Explain immune system response to bacteria videos published before January 2024 with at least 1000 views.")
query.pretty_print()

content_search: immune system response to bacteria
title_search: immune response bacteria
min_view_count: 1000
earliest_publish_date: 2023-01-01


# 6. Indexing

In RAG, the first thing we do is creating a vector store that stores “chunks” of the provided documents. They are stored in our vector database in a way that they can easily and efficiently be retrieved given a query. It is called indexing.

In [1]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

## 6.1. Multi-representation indexing

In [2]:
from langchain_core.documents import Document
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.prompts import ChatPromptTemplate
from langchain import hub

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [8]:
loader = WebBaseLoader("https://byjus.com/maths/numbers/#:~:text=Numbers%20Definition,symbols%20in%20a%20logical%20manner.")
docs = loader.load()

loader = WebBaseLoader("https://www.eia.gov/kids/energy-sources/electricity/science-of-electricity.php")
docs.extend(loader.load())

In [9]:
chain = (
    {"doc": lambda x: x.page_content}
    | ChatPromptTemplate.from_template("Summarize the following document:\n\n{doc}")
    | ChatOpenAI(model="gpt-4o-mini",max_retries=0)
    | StrOutputParser()
)

summaries = chain.batch(docs, {"max_concurrency": 5})

In [10]:
summaries[0]

'The document provides a comprehensive overview of numbers, including their definitions, types, properties, and examples. \n\n### Key Points:\n\n1. **Definition of Numbers**: Numbers are arithmetic values used for representing quantities and performing calculations. They can be expressed as numerals (e.g., 3) or in words (e.g., three).\n\n2. **Types of Numbers**:\n   - **Natural Numbers**: Positive integers starting from 1 (e.g., 1, 2, 3…).\n   - **Whole Numbers**: Non-negative integers including zero (e.g., 0, 1, 2…).\n   - **Integers**: Whole numbers that can be positive, negative, or zero (e.g., -3, -2, -1, 0, 1…).\n   - **Real Numbers**: All rational and irrational numbers, including fractions and decimals.\n   - **Rational Numbers**: Numbers that can be expressed as a fraction (e.g., 1/2, 3/4).\n   - **Irrational Numbers**: Numbers that cannot be expressed as a fraction (e.g., √2).\n   - **Complex Numbers**: Numbers in the form of a + bi (where i is the imaginary unit).\n   - **Im

In [12]:
summaries[1]

'The document from the U.S. Energy Information Administration (EIA) focuses on the science of electricity, providing a comprehensive overview of its fundamental concepts and applications. Key points include:\n\n1. **Atoms and Electricity**: Everything is made up of atoms, which consist of protons, neutrons, and electrons. Protons have a positive charge, electrons have a negative charge, and neutrons are neutral. Electricity is defined as the movement of electrons between atoms.\n\n2. **Static Electricity**: Natural occurrences like lightning and everyday experiences, such as static shocks, illustrate the principles of electricity. When electrons move, they create static electricity.\n\n3. **Magnets and Electricity**: The relationship between magnets and electricity is explained, noting that the movement of magnetic fields can induce electric currents. This principle is harnessed in electricity generation.\n\n4. **Batteries and Circuits**: Batteries produce electricity through chemical 

In [15]:
from langchain.storage import InMemoryByteStore
from langchain.retrievers.multi_vector import MultiVectorRetriever
import uuid

docstore = InMemoryByteStore() # To store the documents
vectorstore = Chroma(collection_name="summaries", embedding_function=OpenAIEmbeddings()) # To store the embeddings from the summeries of the documents

# ids that map summeries to the documents
doc_ids = [str(uuid.uuid4()) for _ in docs]

# Create documents from summeries. 
summary_docs = [Document(page_content=s, metadata={"doc_id": doc_id}) for s, doc_id in zip(summaries, doc_ids)]

# Create the retriever
retriever = MultiVectorRetriever(
    vectorstore=vectorstore,
    byte_store=docstore,
    id_key="doc_id"
)

# Add summaries to the vectorstore
retriever.vectorstore.add_documents(summary_docs)

# Add docuemnts to the docstore
retriever.docstore.mset(list(zip(doc_ids, docs)))

In [16]:
query = "Memory in numbers"
sub_docs = vectorstore.similarity_search(query,k=1)
sub_docs[0]

Document(metadata={'doc_id': '2dee931d-0e91-43fe-9646-70d0dfa272b4'}, page_content='The document provides a comprehensive overview of numbers, including their definitions, types, properties, and examples. \n\n### Key Points:\n\n1. **Definition of Numbers**: Numbers are arithmetic values used for representing quantities and performing calculations. They can be expressed as numerals (e.g., 3) or in words (e.g., three).\n\n2. **Types of Numbers**:\n   - **Natural Numbers**: Positive integers starting from 1 (e.g., 1, 2, 3…).\n   - **Whole Numbers**: Non-negative integers including zero (e.g., 0, 1, 2…).\n   - **Integers**: Whole numbers that can be positive, negative, or zero (e.g., -3, -2, -1, 0, 1…).\n   - **Real Numbers**: All rational and irrational numbers, including fractions and decimals.\n   - **Rational Numbers**: Numbers that can be expressed as a fraction (e.g., 1/2, 3/4).\n   - **Irrational Numbers**: Numbers that cannot be expressed as a fraction (e.g., √2).\n   - **Complex N

In [17]:
retrieved_docs = retriever.invoke(query)
len(retrieved_docs[0].page_content)

36681

## 6.2. RAPTOR (Recursive Abstractive Processing for Tree-Organized Retrieval)

### 6.2.1. Loading the documents

In [3]:
earthquake_doc = WebBaseLoader("https://simple.wikipedia.org/wiki/Earthquake")
docs = earthquake_doc.load()

thunerstorm_doc = WebBaseLoader("https://en.wikipedia.org/wiki/Thunderstorm")
docs.extend(thunerstorm_doc.load())

In [4]:
sorted_docs = sorted(docs, key=lambda x: x.metadata["source"])
d_reversed = list(reversed(sorted_docs))

concatenated_content = "\n\n\n --- \n\n\n".join(
    [doc.page_content for doc in d_reversed]
)

In [5]:
chunk_size_tok = 2000
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=chunk_size_tok, chunk_overlap=0
)
texts_split = text_splitter.split_text(concatenated_content)

In [6]:
embeddings = OpenAIEmbeddings()

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

### 6.2.2. Tree construction

In [1]:
#pip install --quiet umap-learn==0.5.2

Note: you may need to restart the kernel to use updated packages.


In [7]:
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import umap.umap_ as umap
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from sklearn.mixture import GaussianMixture

RANDOM_SEED = 224  # Fixed seed for reproducibility

### --- Code from citations referenced above (added comments and docstrings) --- ###


def global_cluster_embeddings(
    embeddings: np.ndarray,
    dim: int,
    n_neighbors: Optional[int] = None,
    metric: str = "cosine",
) -> np.ndarray:
    """
    Perform global dimensionality reduction on the embeddings using UMAP.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - dim: The target dimensionality for the reduced space.
    - n_neighbors: Optional; the number of neighbors to consider for each point.
                   If not provided, it defaults to the square root of the number of embeddings.
    - metric: The distance metric to use for UMAP.

    Returns:
    - A numpy array of the embeddings reduced to the specified dimensionality.
    """
    if n_neighbors is None:
        n_neighbors = int((len(embeddings) - 1) ** 0.5)
    return umap.UMAP(
        n_neighbors=n_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)


def local_cluster_embeddings(
    embeddings: np.ndarray, dim: int, num_neighbors: int = 10, metric: str = "cosine"
) -> np.ndarray:
    """
    Perform local dimensionality reduction on the embeddings using UMAP, typically after global clustering.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - dim: The target dimensionality for the reduced space.
    - num_neighbors: The number of neighbors to consider for each point.
    - metric: The distance metric to use for UMAP.

    Returns:
    - A numpy array of the embeddings reduced to the specified dimensionality.
    """
    return umap.UMAP(
        n_neighbors=num_neighbors, n_components=dim, metric=metric
    ).fit_transform(embeddings)


def get_optimal_clusters(
    embeddings: np.ndarray, max_clusters: int = 50, random_state: int = RANDOM_SEED
) -> int:
    """
    Determine the optimal number of clusters using the Bayesian Information Criterion (BIC) with a Gaussian Mixture Model.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - max_clusters: The maximum number of clusters to consider.
    - random_state: Seed for reproducibility.

    Returns:
    - An integer representing the optimal number of clusters found.
    """
    max_clusters = min(max_clusters, len(embeddings)) # Maximum number of clusters is limited by the number of embeddings
    n_clusters = np.arange(1, max_clusters) # Range of clusters to consider (1 to max_clusters)
    bics = []
    for n in n_clusters:
        gm = GaussianMixture(n_components=n, random_state=random_state) # For each number of clusters (i.e., number of mixture components) n, calculate gaussian mixture distribution parameters 
        gm.fit(embeddings)
        bics.append(gm.bic(embeddings)) # Calculate the Bayesian Information Criterion (BIC) for the current number of clusters
    return n_clusters[np.argmin(bics)] # Return the number of clusters that minimized the BIC


def GMM_cluster(embeddings: np.ndarray, threshold: float, random_state: int = 0):
    """
    Cluster embeddings using a Gaussian Mixture Model (GMM) based on a probability threshold.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - threshold: The probability threshold for assigning an embedding to a cluster.
    - random_state: Seed for reproducibility.

    Returns:
    - A tuple containing the cluster labels and the number of clusters determined.
    """
    n_clusters = get_optimal_clusters(embeddings) # Determine the optimal number of clusters using BIC
    gm = GaussianMixture(n_components=n_clusters, random_state=random_state)
    gm.fit(embeddings) # Fit the Gaussian mixture distribution with parameters related to the optimal number of clusters to the embeddings
    probs = gm.predict_proba(embeddings) # Calculate the probabilities of each embedding belonging to each cluster
    labels = [np.where(prob > threshold)[0] for prob in probs] # Assign embeddings to clusters based on the threshold
    return labels, n_clusters


def perform_clustering(
    embeddings: np.ndarray,
    dim: int,
    threshold: float,
) -> List[np.ndarray]:
    """
    Perform clustering on the embeddings by first reducing their dimensionality globally, then clustering
    using a Gaussian Mixture Model, and finally performing local clustering within each global cluster.

    Parameters:
    - embeddings: The input embeddings as a numpy array.
    - dim: The target dimensionality for UMAP reduction.
    - threshold: The probability threshold for assigning an embedding to a cluster in GMM.

    Returns:
    - A list of numpy arrays, where each array contains the cluster IDs for each embedding.
    """
    if len(embeddings) <= dim + 1:
        # Avoid clustering when there's insufficient data
        return [np.array([0]) for _ in range(len(embeddings))]

    # Global dimensionality reduction
    reduced_embeddings_global = global_cluster_embeddings(embeddings, dim)
    # Global clustering
    global_clusters, n_global_clusters = GMM_cluster(
        reduced_embeddings_global, threshold
    )

    all_local_clusters = [np.array([]) for _ in range(len(embeddings))]
    total_clusters = 0

    # Iterate through each global cluster to perform local clustering
    for i in range(n_global_clusters):
        # Extract embeddings belonging to the current global cluster
        global_cluster_embeddings_ = embeddings[
            np.array([i in gc for gc in global_clusters])
        ]

        if len(global_cluster_embeddings_) == 0:
            continue
        if len(global_cluster_embeddings_) <= dim + 1:
            # Handle small clusters with direct assignment
            local_clusters = [np.array([0]) for _ in global_cluster_embeddings_]
            n_local_clusters = 1
        else:
            # Local dimensionality reduction and clustering
            reduced_embeddings_local = local_cluster_embeddings(
                global_cluster_embeddings_, dim
            )
            local_clusters, n_local_clusters = GMM_cluster(
                reduced_embeddings_local, threshold
            )

        # Assign local cluster IDs, adjusting for total clusters already processed
        for j in range(n_local_clusters):
            local_cluster_embeddings_ = global_cluster_embeddings_[
                np.array([j in lc for lc in local_clusters])
            ]
            indices = np.where(
                (embeddings == local_cluster_embeddings_[:, None]).all(-1)
            )[1]
            for idx in indices:
                all_local_clusters[idx] = np.append(
                    all_local_clusters[idx], j + total_clusters
                )

        total_clusters += n_local_clusters

    return all_local_clusters


def embed(texts):
    """
    Generate embeddings for a list of text documents.

    This function assumes the existence of an `embd` object with a method `embed_documents`
    that takes a list of texts and returns their embeddings.

    Parameters:
    - texts: List[str], a list of text documents to be embedded.

    Returns:
    - numpy.ndarray: An array of embeddings for the given text documents.
    """
    text_embeddings = embeddings.embed_documents(texts)
    text_embeddings_np = np.array(text_embeddings)
    return text_embeddings_np


def embed_cluster_texts(texts):
    """
    Embeds a list of texts and clusters them, returning a DataFrame with texts, their embeddings, and cluster labels.

    This function combines embedding generation and clustering into a single step. It assumes the existence
    of a previously defined `perform_clustering` function that performs clustering on the embeddings.

    Parameters:
    - texts: List[str], a list of text documents to be processed.

    Returns:
    - pandas.DataFrame: A DataFrame containing the original texts, their embeddings, and the assigned cluster labels.
    """
    text_embeddings_np = embed(texts)  # Generate embeddings
    cluster_labels = perform_clustering(
        text_embeddings_np, 10, 0.1
    )  # Perform clustering on the embeddings
    df = pd.DataFrame()  # Initialize a DataFrame to store the results
    df["text"] = texts  # Store original texts
    df["embd"] = list(text_embeddings_np)  # Store embeddings as a list in the DataFrame
    df["cluster"] = cluster_labels  # Store cluster labels
    return df


def fmt_txt(df: pd.DataFrame) -> str:
    """
    Formats the text documents in a DataFrame into a single string.

    Parameters:
    - df: DataFrame containing the 'text' column with text documents to format.

    Returns:
    - A single string where all text documents are joined by a specific delimiter.
    """
    unique_txt = df["text"].tolist()
    return "--- --- \n --- --- ".join(unique_txt)


def embed_cluster_summarize_texts(
    texts: List[str], level: int
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Embeds, clusters, and summarizes a list of texts. This function first generates embeddings for the texts,
    clusters them based on similarity, expands the cluster assignments for easier processing, and then summarizes
    the content within each cluster.

    Parameters:
    - texts: A list of text documents to be processed.
    - level: An integer parameter that could define the depth or detail of processing.

    Returns:
    - Tuple containing two DataFrames:
      1. The first DataFrame (`df_clusters`) includes the original texts, their embeddings, and cluster assignments.
      2. The second DataFrame (`df_summary`) contains summaries for each cluster, the specified level of detail,
         and the cluster identifiers.
    """

    # Embed and cluster the texts, resulting in a DataFrame with 'text', 'embd', and 'cluster' columns
    df_clusters = embed_cluster_texts(texts)

    # Prepare to expand the DataFrame for easier manipulation of clusters
    expanded_list = []

    # Expand DataFrame entries to document-cluster pairings for straightforward processing
    for index, row in df_clusters.iterrows():
        for cluster in row["cluster"]:
            expanded_list.append(
                {"text": row["text"], "embd": row["embd"], "cluster": cluster}
            )

    # Create a new DataFrame from the expanded list
    expanded_df = pd.DataFrame(expanded_list)

    # Retrieve unique cluster identifiers for processing
    all_clusters = expanded_df["cluster"].unique()

    print(f"--Generated {len(all_clusters)} clusters--")

    # Summarization
    template = """Here is a sub-set of LangChain Expression Language doc. 
    
    LangChain Expression Language provides a way to compose chain in LangChain.
    
    Give a detailed summary of the documentation provided.
    
    Documentation:
    {context}
    """
    prompt = ChatPromptTemplate.from_template(template)
    chain = prompt | llm | StrOutputParser()

    # Format text within each cluster for summarization
    summaries = []
    for i in all_clusters:
        df_cluster = expanded_df[expanded_df["cluster"] == i]
        formatted_txt = fmt_txt(df_cluster)
        summaries.append(chain.invoke({"context": formatted_txt}))

    # Create a DataFrame to store summaries with their corresponding cluster and level
    df_summary = pd.DataFrame(
        {
            "summaries": summaries,
            "level": [level] * len(summaries),
            "cluster": list(all_clusters),
        }
    )

    return df_clusters, df_summary


def recursive_embed_cluster_summarize(
    texts: List[str], level: int = 1, n_levels: int = 3
) -> Dict[int, Tuple[pd.DataFrame, pd.DataFrame]]:
    """
    Recursively embeds, clusters, and summarizes texts up to a specified level or until
    the number of unique clusters becomes 1, storing the results at each level.

    Parameters:
    - texts: List[str], texts to be processed.
    - level: int, current recursion level (starts at 1).
    - n_levels: int, maximum depth of recursion.

    Returns:
    - Dict[int, Tuple[pd.DataFrame, pd.DataFrame]], a dictionary where keys are the recursion
      levels and values are tuples containing the clusters DataFrame and summaries DataFrame at that level.
    """
    results = {}  # Dictionary to store results at each level

    # Perform embedding, clustering, and summarization for the current level
    df_clusters, df_summary = embed_cluster_summarize_texts(texts, level)

    # Store the results of the current level
    results[level] = (df_clusters, df_summary)

    # Determine if further recursion is possible and meaningful
    unique_clusters = df_summary["cluster"].nunique()
    if level < n_levels and unique_clusters > 1:
        # Use summaries as the input texts for the next level of recursion
        new_texts = df_summary["summaries"].tolist()
        next_level_results = recursive_embed_cluster_summarize(
            new_texts, level + 1, n_levels
        )

        # Merge the results from the next level into the current results dictionary
        results.update(next_level_results)

    return results

In [8]:
leaf_texts = [d.page_content for d in docs]
results = recursive_embed_cluster_summarize(leaf_texts, level=1, n_levels=3)

--Generated 1 clusters--


In [9]:
all_texts = leaf_texts.copy()

# Iterate through the results to extract summaries from each level and add them to all_texts
for level in sorted(results.keys()):
    # Extract summaries from the current level's DataFrame
    summaries = results[level][1]["summaries"].tolist()
    # Extend all_texts with the summaries from the current level
    all_texts.extend(summaries)

# Now, use all_texts to build the vectorstore with Chroma
vectorstore = Chroma.from_texts(texts=all_texts, embedding=embeddings)
retriever = vectorstore.as_retriever()

In [11]:
prompt = hub.pull("rlm/rag-prompt")


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# Question
rag_chain.invoke("What is the difference between earthquake swarms and earthquake storm?")

Number of requested results 4 is greater than number of elements in index 3, updating n_results = 3


'Earthquake swarms consist of multiple earthquakes occurring in a specific area over a short period without a clear main shock, while earthquake storms involve a series of earthquakes triggered by the stress redistribution from previous quakes, often along a fault line. In swarms, no single earthquake is significantly larger than the others, whereas in storms, later earthquakes can be as damaging as earlier ones. Both phenomena highlight different patterns of seismic activity but differ in their triggering mechanisms and characteristics.'

# 7. Retrieval

The basic RAG pipeline involves embedding a user query, retrieving relevant documents to the query, and passing the documents to an LLM for generation of an answer grounded in the retrieved context

In [1]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

In [2]:
from langchain_community.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain import hub
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from typing import List
from langchain.load import loads, dumps

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [3]:
earthquake_doc = WebBaseLoader("https://simple.wikipedia.org/wiki/Earthquake")
docs = earthquake_doc.load()

thunerstorm_doc = WebBaseLoader("https://en.wikipedia.org/wiki/Thunderstorm")
docs.extend(thunerstorm_doc.load())

In [16]:
!rm -rf data/science/  # remove old database files if any

In [4]:
# Split text into chunks
text_splitter  = RecursiveCharacterTextSplitter(chunk_size=500,chunk_overlap=20)
text_chunks = text_splitter.split_documents(docs)

vectorstore = Chroma.from_documents(documents=text_chunks, 
                                    embedding=OpenAIEmbeddings(),
                                    persist_directory="data/science")
vectorstore.persist()
retriever = vectorstore.as_retriever(search_kwargs={'k':5})

  warn_deprecated(


## 7.1. Reciprocal Rank Fusion

In [20]:
def rrf(results: List[List], k=60):
    # Initialize a dictionary to hold fused scores for each unique document
    fused_scores = {}

    # Iterate through each list of ranked documents
    for docs in results:
        # Iterate through each document in the list, with its rank (position in the list)
        for rank, doc in enumerate(docs):
            # Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
            doc_str = dumps(doc)
            # If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
            if doc_str not in fused_scores:
                fused_scores[doc_str] = 0
            # Retrieve the current score of the document, if any
            previous_score = fused_scores[doc_str]
            # Update the score of the document using the RRF formula: 1 / (rank + k)
            fused_scores[doc_str] += 1 / (rank + k)

    # Sort the documents based on their fused scores in descending order to get the final reranked results
    reranked_results = [
        (loads(doc), score)
        for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
    ]

    # Return the reranked results as a list of tuples, each containing the document and its fused score
    return reranked_results

In [21]:
from langchain.prompts import ChatPromptTemplate

question = "What is an earthquake?"

prompt = ChatPromptTemplate.from_template(
    """
    You are an intelligent assistant. Your task is to generate 4 questions based on the provided question in different wording and different perspectives to retrieve relevant documents from a vector database. By generating multiple perspectives on the user question, your goal is to help the user overcome some of the limitations of the distance-based similarity search. Provide these alternative questions separated by newlines. Original question: {question}
    """
)

generate_queries = (
    {"question": RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0.7)
    | StrOutputParser()
    | (lambda x: x.split("\n"))
)


fusion_retrieval_chain = (
    {'question': RunnablePassthrough()}
    | generate_queries
    | retriever.map()
    | rrf
)

fusion_retrieval_chain.invoke(question)

  warn_beta(


[(Document(metadata={'language': 'en', 'source': 'https://simple.wikipedia.org/wiki/Earthquake', 'title': 'Earthquake - Simple English Wikipedia, the free encyclopedia'}, page_content="Disturbances in the Earth cause earthquakes. Different tectonic plates are slowly moving. When they get stuck, tension builds up in them. Earthquakes occur when tectonic plates suddenly break free, so they start moving quickly. The first point of an earthquake's rupture is called its hypocenter or focus. The epicenter is the point at ground level directly above the hypocenter."),
  0.11374726671031536),
 (Document(metadata={'language': 'en', 'source': 'https://simple.wikipedia.org/wiki/Earthquake', 'title': 'Earthquake - Simple English Wikipedia, the free encyclopedia'}, page_content="Causes of an earthquake[change | change source]\nSee the main article: plate tectonics\nEarthquakes are caused by tectonic movements in the Earth's crust. The main cause is when tectonic plates ride one over the other, caus

In [22]:
def format_context(documents: List):
    return "\n\n".join([doc[0].page_content for doc in documents])


prompt = ChatPromptTemplate.from_template(
    """
    Asnwer the given question using the provided context.\n\nContext: {context}\n\nQuestion: {question}
    """
)

rag_with_rrf_chain = (
    {'context': fusion_retrieval_chain | format_context, 'question': RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0)
    | StrOutputParser()
)

rag_with_rrf_chain.invoke(question)

"An earthquake is a sudden release of stress in tectonic plates that causes them to break free and start moving quickly. This release of energy generates waves that travel through the Earth. The first point of an earthquake's rupture is called its hypocenter or focus, while the point directly above it at ground level is known as the epicenter. Earthquakes are caused by disturbances in the Earth's crust, primarily due to the movement of tectonic plates."

## 7.2. Cohere Reranking

In [25]:
#pip install cohere

Note: you may need to restart the kernel to use updated packages.


In [26]:
#pip install -U langchain-cohere

Note: you may need to restart the kernel to use updated packages.


In [None]:
from langchain_community.llms import Cohere
from langchain.retrievers import  ContextualCompressionRetriever
from langchain_cohere import CohereRerank

compressor = CohereRerank(model="rerank-english-v2.0", top_n=3)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=retriever
)

compressed_docs = compression_retriever.get_relevant_documents(question)
compressed_docs

In [None]:
prompt = ChatPromptTemplate.from_template(
    """
    Asnwer the given question using the provided context.\n\nContext: {context}\n\nQuestion: {question}
    """
)


cohere_with_rag_chain = (
    {'context': compression_retriever, 'question': RunnablePassthrough()}
    | prompt
    | ChatOpenAI(model='gpt-4o-mini', temperature=0)
    | StrOutputParser()
)

cohere_with_rag_chain.invoke(question)

# 8. Generation

In a basic pipeline, RAG does not know whether or not it should retrieve the documents to answer the question, if so, from what data source, do the retrieve documents are actualy relevant to answer the question, or the generated response is correct.

Existing research propose Self-RAG and Coorective RAG (CRAG).

In [1]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [3]:
urls = [
    "https://kids.britannica.com/kids/article/thunderstorm/399624",
    "https://kids.britannica.com/kids/article/clock/400097",
    "https://www.cmosc.org/the-science-of-bubbles-for-kids/"
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=30
)
doc_splits = text_splitter.split_documents(docs_list)

# Add to vectorDB
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

In [4]:
from langchain.pydantic_v1 import BaseModel, Field
from typing import Literal

question = "What are thunderstorm's features?"

class Grader(BaseModel):
    
    """Returns a binary value 'yes' or 'no' based on the relevancy of the document to the question"""
    
    grade: Literal["yes", "no"] = Field(..., description="The relevancy of the document to the question. 'yes' if relevant, 'no' if not relevant")
    
llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)
grading_llm = llm.with_structured_output(Grader)

grading_prompt = ChatPromptTemplate.from_messages(
    [
        ('system', """You are a grader assessing relevance of a retrieved document to a user question. \n 
    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""),
        ('user', "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

docs = retriever.invoke(question)
doc_txt = docs[1].page_content

grading_chain = grading_prompt | grading_llm


grading_chain.invoke({'document': doc_txt, 'question': question})

Grader(grade='yes')

In [5]:
grading_chain.invoke({'document': doc_txt, 'question':'What is access control?'})

Grader(grade='no')

In [6]:
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

prompt = hub.pull("rlm/rag-prompt")


generation_chain = prompt | llm | StrOutputParser()

generation_chain.invoke({'context': doc_txt, 'question': question})

'Thunderstorms are characterized by thick clouds, heavy rain or hail, lightning, thunder, and strong winds. They typically occur between May and September in the Northern Hemisphere and November to March in the Southern Hemisphere. Thunderstorms are most frequent in temperate and tropical regions, with some equatorial areas experiencing over 180 days of thunder annually.'

In [7]:
system = """You a question re-writer that converts an input question to a better version that is optimized \n 
     for web search. Look at the input and try to reason about the underlying semantic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

re_write_chain = re_write_prompt | llm | StrOutputParser()


re_write_chain.invoke({"question": "thunderstorm features"})

'What are the key features and characteristics of thunderstorms?'

In [8]:
from langchain.agents.load_tools import load_tools
from langchain.agents import initialize_agent, AgentType

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools = load_tools(["wikipedia"], llm=llm)
web_search_tool= initialize_agent(
    tools, 
    llm, 
    agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    handle_parsing_errors=True,
    verbose = True)

  warn_deprecated(


In [9]:
from typing_extensions import TypedDict

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        search_web: whether to search the web
        documents: list of documents
    """
    
    question: str
    documents: list[str]
    search_web: str
    generation: str

In [10]:
from langchain.schema import Document

def retrieve(state):
    """
    Retrieve documents related to the question
    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    
    print('> 📃 Retrieving documents...')
    
    question = state['question']
    docs = retriever.invoke(question)
    
    state['documents'] = [doc.page_content for doc in docs]
    return state

def grade(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """
    
    question = state['question']
    documents = state['documents']
    search_web = "yes"
    
    print('> 🔍 Grading documents...')
    
    filtered_docs = []
    
    for i,doc in enumerate(documents):
        grade = grading_chain.invoke({'document': doc, 'question': question})
        if grade.grade == 'yes':
            print(f'> 📝 \033[92mDocument {i} is relevant\033[0m')
            filtered_docs.append(doc)
            search_web = 'no'
        else:
            print(f'> 📝 \033[91mDocument {i} is irrelevant\033[0m')

            
    state['documents'] = filtered_docs
    state['search_web'] = search_web
    
    return state

def rewrite_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """
    print('> ✍🏻 Rewriting the question...')
    question = state['question']
    new_question = re_write_chain.invoke({'question': question})
    
    state['question'] = new_question
    return state

def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print('> 🌎 Web searching...')
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool.invoke({"input": question})
    web_results = docs["output"]
    web_results = Document(page_content=web_results)
    documents.append(web_results)
    
    state["documents"] = documents

    return state

def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("> 🤖 Generating the answer...")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = generation_chain.invoke({"context": documents, "question": question})
    state["generation"] = generation
    return state

In [11]:
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns: Method name to execute next
    """

    print("> ❓ Deciding to generate...")
    search_web = state["search_web"]

    if search_web == "yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("> 💡 Decision: \033[91mAll the retrieved documents are irrelevant\033[0m")
        return "rewrite_query"
    else:
        # We have relevant documents, so generate answer
        print("> 💡 Decision: \033[92mRelevant documents found\033[0m")
        return "generate"

In [11]:
#pip install --quiet langgraph

Note: you may need to restart the kernel to use updated packages.


In [12]:
from langgraph.graph import END, StateGraph

# Provide the state graph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("grade", grade)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("rewrite_query", rewrite_query)  # rewrite_query
workflow.add_node("web_search", web_search)  # web search

# Build graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade")
workflow.add_conditional_edges(
    "grade",
    decide_to_generate,
    {
        "rewrite_query": "rewrite_query",
        "generate": "generate",
    },
)
workflow.add_edge("rewrite_query", "web_search")
workflow.add_edge("web_search", "generate")
workflow.add_edge("generate", END)

# Compile
app = workflow.compile()

def run_pipeline(question):
    inputs = {"question": question}
    for output in app.stream(inputs):
        for key, value in output.items():
            if key == 'generate':
                print()
                print(f'Question: {inputs["question"]}')
                print(f"Answer: {value['generation']}")

In [13]:
run_pipeline("What are thunderstorm's features?")

> 📃 Retrieving documents...
> 🔍 Grading documents...
> 📝 [92mDocument 0 is relevant[0m
> 📝 [92mDocument 1 is relevant[0m
> 📝 [92mDocument 2 is relevant[0m
> 📝 [92mDocument 3 is relevant[0m
> ❓ Deciding to generate...
> 💡 Decision: [92mRelevant documents found[0m
> 🤖 Generating the answer...

Question: What are thunderstorm's features?
Answer: Thunderstorms are characterized by thick clouds, heavy rain or hail, lightning, thunder, and strong winds. They typically form when hot, moist air rises quickly and cools, leading to the development of clouds and precipitation. Severe thunderstorms can also produce tornadoes and are most common in temperate and tropical regions.


In [13]:
run_pipeline("Who is Tom Brady?")

> 📃 Retrieving documents...
> 🔍 Grading documents...
> 📝 [91mDocument 0 is irrelevant[0m
> 📝 [91mDocument 1 is irrelevant[0m
> 📝 [91mDocument 2 is irrelevant[0m
> 📝 [91mDocument 3 is irrelevant[0m
> ❓ Deciding to generate...
> 💡 Decision: [91mAll the retrieved documents are irrelevant[0m
> ✍🏻 Rewriting the question...
> 🌎 Web searching...


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: I need to gather information about Tom Brady's career, including key facts and achievements. I'll use the Wikipedia tool to find comprehensive details about him. 
Action:
```
{
  "action": "wikipedia",
  "action_input": "Tom Brady"
}
```
[0m
Observation: [36;1m[1;3mPage: Tom Brady
Summary: Thomas Edward Patrick Brady Jr. (born August 3, 1977) is an American former professional football quarterback who played in the National Football League (NFL) for 23 seasons. He spent his first 20 seasons with the New England Patriots and was a central contributor to the franchise's d

# 9. Generation II

To improve the previous CRAG implementation by adding “Query Analysis” as described in Self-RAG and Adaptive RAG papers.

In [1]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()


# Ensure the OpenAI API key is set in the environment
openai_api_key = os.getenv('OPENAI_API_KEY')
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable is not set")

In [2]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [3]:
urls = [
    "https://en.wikipedia.org/wiki/Thunderstorm",
    "https://en.wikipedia.org/wiki/Clock",
    "https://en.wikipedia.org/wiki/Soap_bubble"
]

docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=30
)
doc_splits = text_splitter.split_documents(docs_list)

# Add to vectorDB
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

In [4]:
from langchain.pydantic_v1 import BaseModel, Field
from typing import Literal

class QueryRouter(BaseModel):
    """Routes the user query to appropriate datasources. If the query can be answered using documents about either thunderstorm, clock, or soap bubble, returns 'vectorstore'. Otherwise returns 'web_serach'. If the query can be answered using science internal knowledge, return 'fallback'"""
    
    datasource: Literal["vectorstore", "web_search", "fallback"] = Field(...,
                description="The datasource to use for answering the query. 'vectorstore' if the query is either related to thunderstorm, clock, or soap bubble \
                        'web_search' if the query is not related to the above topics and requires web search. 'fallback' if the query can be answered using science internal knowledge")
                        
llm = ChatOpenAI(model='gpt-4o-mini', temperature=0)
query_llm = llm.with_structured_output(QueryRouter)

query_router_prompt = ChatPromptTemplate.from_template(
    """You are an expert at routing a user question to a vectorstore or web search. The vectorstore contains documents related to thunderstorm, clock and soap bubble.
Use the vectorstore for questions on these topics. Otherwise, use web_search. If the question can be answered using science internal knowledge, use fallback.\n\n
Question: {question}"""
)

query_routing_chain = (query_router_prompt | query_llm)

In [5]:
question = "What is a soap bubble?"

query_routing_chain.invoke({"question": question})

QueryRouter(datasource='vectorstore')

In [6]:
question = "Who is tom brady?"

query_routing_chain.invoke({"question": question})

QueryRouter(datasource='web_search')

In [7]:
question = "What is math?"

query_routing_chain.invoke({"question": question})

QueryRouter(datasource='fallback')

In [8]:
class DocumentGrader(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    grade: str = Field(..., 
        description="Documents are relevant to the question, 'yes' or 'no'"
    )
    
grader_llm = llm.with_structured_output(DocumentGrader)

grading_prompt = ChatPromptTemplate.from_template(
    """
    You are a grader assessing relevance of a retrieved document to a user question. \n
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.\n\n
    Retrieved document: {document}\n\nQuestion: {question}
    """
)

grading_chain = (grading_prompt | grader_llm)

In [9]:
question = "What is a soap bubble?"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
grading_chain.invoke({"document": doc_txt, "question": question})

DocumentGrader(grade='yes')

In [10]:
qa_prmpt = hub.pull('rlm/rag-prompt')

qa_chain = qa_prmpt | llm | StrOutputParser()

In [11]:
qa_chain.invoke({"question": "What is a soap bubble?", "context": docs})

'A soap bubble is a thin film of soap or detergent and water that encloses air, forming a hollow sphere with an iridescent surface. They typically last only a few seconds before bursting and are often enjoyed by children and used in artistic performances. The colors seen in soap bubbles result from light wave interference rather than refraction.'

In [12]:
fallback_prompt = ChatPromptTemplate.from_template(
    """
    You are an assistant for question-answering tasks. Answer the question based upon your knowledge. Use three sentences maximum and keep the answer concise.\n\n
    Question: {question}
    """
)

fallback_chain = fallback_prompt | llm | StrOutputParser()

In [13]:
fallback_chain.invoke({'question': "What is math?"})

'Math, or mathematics, is the study of numbers, quantities, shapes, and patterns. It involves the use of symbols and rules to solve problems and understand relationships. Math is fundamental in various fields, including science, engineering, economics, and everyday life.'

In [14]:
class HallucinationEvaluator(BaseModel):
    """Binary score for hallucination present in generation answer."""

    grade: str = Field(...,
        description="Answer is grounded in the facts, 'yes' or 'no'"
    )
    
hallucination_llm = llm.with_structured_output(HallucinationEvaluator)
hallucination_prompt = ChatPromptTemplate.from_template(
    """
    You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n
    Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts.\n\n
    Set of facts: {documents} \n\n LLM generation: {generation}
    """
)

hallucination_chain = hallucination_prompt | hallucination_llm

In [15]:
question = "What are physics of soap bubble?"
print("Question: " + question)
generation = qa_chain.invoke({"question": question, "context": docs})
print("Generation: " + generation)
hallucination_chain.invoke({"documents": docs, "generation": generation})

Question: What are physics of soap bubble?
Generation: The physics of soap bubbles involves the principles of surface tension and light interference. A soap bubble is a thin film of soapy water that forms a hollow sphere, minimizing surface area while enclosing air, which is a demonstration of the minimal surface concept. The colors seen on a soap bubble's surface result from light wave interference, reflecting off the film's front and back surfaces.


HallucinationEvaluator(grade='yes')

In [16]:
class AnswerGrader(BaseModel):
    """Binary score to assess answer addresses question."""

    grade: str = Field(...,
        description="Answer addresses the question, 'yes' or 'no'"
    )

answer_grader_llm = llm.with_structured_output(AnswerGrader)
answer_grader_prompt = ChatPromptTemplate.from_template(
    """
    You are a grader assessing whether an answer addresses / resolves a question. \n
    Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question.\n\n
    Question: {question} \n\n Answer: {answer}
    """
)

answer_grader_chain = answer_grader_prompt | answer_grader_llm

In [17]:
answer_grader_chain.invoke({"question": question, "answer": generation})

AnswerGrader(grade='yes')

In [18]:
answer_grader_chain.invoke({"question": question, "answer": "Tom Brady is an NFL football player born on August 3, 1977. He has led the Patriots to multiple victories, including setting an NFL record with 21 straight wins and becoming the first player ever to win six Super Bowls."})

AnswerGrader(grade='no')

In [19]:
from langchain.agents.load_tools import load_tools
from langchain.agents import initialize_agent, AgentType

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools = load_tools(["wikipedia"], llm=llm)
web_search_tool= initialize_agent(
    tools, 
    llm, 
    agent=AgentType.CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    handle_parsing_errors=True,
    verbose = True)

  warn_deprecated(


In [20]:
from typing_extensions import TypedDict
from typing import List


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: List[str]

In [21]:
from langchain.schema import Document


def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("> 📃 Retrieving documents...")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    state["documents"] = documents
    return state

def web_search(state):
    """
    Web search based on the re-phrased question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with appended web results
    """

    print("> 🌎 Web searching...")
    question = state["question"]

    # Web search
    docs = web_search_tool.invoke({"input": question})
    web_results = docs["output"]
    web_results = Document(page_content=web_results)
    
    state["documents"] = web_results

    return state

def fallback(state):
    """
    Generate answer using the science knowledge w/o vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains science generation
    """
    print("> 👈 Initiating fallback...")
    question = state["question"]
    generation = fallback_chain.invoke({"question": question})
    
    state["generation"] = generation
    return state

def generate(state):
    """
    Generate answer using the science knowledge w/ vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains science generation
    """
    print("> 🤖 Generating answer...")
    question = state["question"]
    documents = state["documents"]
    generation = qa_chain.invoke({"question": question, "context": documents})
    
    state["generation"] = generation
    return state

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("> 🔍 Grading documents...")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for i,doc in enumerate(documents):
        grade = grading_chain.invoke({'document': doc, 'question': question})
        if grade.grade == 'yes':
            print(f'> 📝 \033[92mDocument {i} is relevant\033[0m')
            filtered_docs.append(doc)

        else:
            print(f'> 📝 \033[91mDocument {i} is irrelevant\033[0m')
            
    state["documents"] = filtered_docs
    return state

In [22]:
def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """
    
    question = state["question"]
    route = query_routing_chain.invoke({"question": question})
    
    if route.datasource == "vectorstore":
        print("> 📚 Routing to the vectorstore...")
        return "retrieve"
    
    elif route.datasource == "web_search":
        print("> 🌎 Routing to web search...")
        return "web_search"
    
    else:
        print("> 👈 Routing to fallback...")
        return "fallback"
    
    
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("> 🤔 Deciding to generate...")
    filtered_documents = state["documents"]

    if not filtered_documents:
        print("> 💡 Decision: \033[91mAll the retrieved documents are irrelevant\033[0m")
        return "web_search"
    else:
        # We have relevant documents, so generate answer
        print("> 💡 Decision: \033[92mRelevant documents found\033[0m")
        return "generate"
    
    
def evaluate_response(state):
    
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """
    
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    
    print("> 🧠 Evaluating the response for hallucinations...")
    
    hallucination_grade = hallucination_chain.invoke({"documents": documents, "generation": generation})
    
    if hallucination_grade.grade == "yes":
        print("> ✅ \033[92mGeneration is grounded in the documents\033[0m")
        
        print("> 🧠 Evaluating the response for answer...")
        
        answer_grade = answer_grader_chain.invoke({"question": question, "answer": generation})
        
        if answer_grade.grade == "yes":
            print("> ✅ \033[92mAnswer addresses the question\033[0m")
            return "useful"
        else:
            print("> ❌ \033[91mAnswer does not address the question\033[0m")
            return "notuseful"
        
    else:
        print("> ❌ \033[91mGeneration is not grounded in the documents\033[0m")
        return "not supported"

In [23]:
from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

workflow.add_node("retrieve", retrieve)
workflow.add_node("web_search", web_search)
workflow.add_node("fallback", fallback)
workflow.add_node("generate", generate)
workflow.add_node("grade_documents", grade_documents)

workflow.set_conditional_entry_point(
    route_question,
    {
        'retrieve': 'retrieve',
        'web_search': 'web_search',
        'fallback': 'fallback'
    }
)
workflow.add_edge("retrieve", "grade_documents")
workflow.add_edge("web_search", "generate")
workflow.add_conditional_edges(
    'grade_documents',
    decide_to_generate,
    {
        'web_search': 'web_search',
        'generate': 'generate'
    }
)
workflow.add_conditional_edges(
    'generate',
    evaluate_response,
    {
        'useful': END,
        'notuseful': 'web_search',
        'not supported': 'generate'
    }
)
workflow.add_edge("fallback", END)

app = workflow.compile()

def run_pipeline(question):
    inputs = {"question": question}
    for output in app.stream(inputs):
        for key, value in output.items():
            if key == 'generate' or key == 'fallback':
                print()
                print(f'Question: {inputs["question"]}')
                print(f"Answer: {value['generation']}")

In [24]:
run_pipeline("What are physics of soap bubble?")

> 📚 Routing to the vectorstore...
> 📃 Retrieving documents...
> 🔍 Grading documents...
> 📝 [91mDocument 0 is irrelevant[0m
> 📝 [92mDocument 1 is relevant[0m
> 📝 [92mDocument 2 is relevant[0m
> 📝 [92mDocument 3 is relevant[0m
> 🤔 Deciding to generate...
> 💡 Decision: [92mRelevant documents found[0m
> 🤖 Generating answer...
> 🧠 Evaluating the response for hallucinations...
> ✅ [92mGeneration is grounded in the documents[0m
> 🧠 Evaluating the response for answer...
> ✅ [92mAnswer addresses the question[0m

Question: What are physics of soap bubble?
Answer: The physics of soap bubbles involves principles of surface tension and pressure differences. When two bubbles merge, they form a shape that minimizes surface area while maintaining volume, with the common wall bulging into the larger bubble due to higher internal pressure. Additionally, at the intersection of three bubbles, the angles between their walls are equal to 120°, as described by Plateau's laws.


In [25]:
run_pipeline("How to raise a child?")

> 🌎 Routing to web search...
> 🌎 Web searching...


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: Raising a child is a broad topic that encompasses various aspects such as parenting styles, education, emotional support, and discipline. I will look for general guidelines and principles on how to raise a child effectively. 
Action:
```
{
  "action": "wikipedia",
  "action_input": "Parenting"
}
```
[0m
Observation: [36;1m[1;3mPage: Parenting
Summary: Parenting or child rearing promotes and supports the physical, cognitive, social, emotional, and educational development from infancy to adulthood. Parenting refers to the intricacies of raising a child and not exclusively for a biological relationship.
The most common caretakers in parenting are the biological parents of the child in question. However, a caretaker may be an older sibling, step-parent, grandparent, legal guardian, aunt, uncle, other family members, or a family friend. Governments and society may also 

In [26]:
run_pipeline("What is math?")

> 👈 Routing to fallback...
> 👈 Initiating fallback...

Question: What is math?
Answer: Math, or mathematics, is the study of numbers, quantities, shapes, and patterns. It involves the use of symbols and rules to solve problems and understand relationships. Math is fundamental in various fields, including science, engineering, economics, and everyday life.
