In [1]:
import pymupdf4llm
md_text = pymupdf4llm.to_markdown(r"C:\Users\Anubhav\OneDrive\Desktop\Projects\GenAI\RAG Langchain\data\time-to-place-our-bets-europes-ai-opportunity.pdf")

Processing C:\Users\Anubhav\OneDrive\Desktop\Projects\GenAI\RAG Langchain\data\time-to-place-our-bets-europes-ai-opportunity.pdf...


In [4]:
md_text

'QuantumBlack, AI by McKinsey\n# Time to place our bets: Europe’s AI opportunity\n\n##### Boosting Europe’s competitiveness across the AI value chain.\n\n_[by Alexander Sukharevsky, Eric Hazan, Sven Smit, Marc-Antoine de la Chevasnerie, Marc de Jong,](https://www.mckinsey.com/our-people/alexander-sukharevsky)_\n_[Solveigh Hieronimus, Jan Mischke, and Guillaume Dagorret](https://www.mckinsey.com/our-people/solveigh-hieronimus)_\n\n\n-----\n\n###### At a glance\n\n— A three-lens approach–on adoption,\n\ncreation, and energy–is required to assess\nEurope’s competitiveness in the emerging\ngenerative AI (gen AI) economy. While much\nof the current discourse centers around large\nlanguage models (LLMs), European policy\nmakers and business leaders must look\nbeyond LLMs. Adopting a holistic approach to\ncapitalize fully on gen AI’s potential could boost\nEuropean labor productivity by up to 3 percent\nannually through 2030.\n\n— On adoption, European organizations lag\n\nbehind their US cou

In [2]:
# CHunking
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter  = RecursiveCharacterTextSplitter(
    chunk_size = 800,
    chunk_overlap = 50,
)

chunks = text_splitter.create_documents([md_text])

In [3]:
len(chunks)

86

In [4]:
# Setting up the vector store
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

vectorstore = Chroma.from_documents(
    documents = chunks,
    collection_name = 'rag_langggraph',
    embedding = OpenAIEmbeddings()
)

retriever = vectorstore.as_retriever()



In [8]:
retriever.invoke("What is Europe's Gen AI strategy?")

[Document(metadata={}, page_content='Creation of gen AI in Europe\nRegarding creating gen AI, winning in every\nsegment isn’t a realistic strategy for Europe.\nA differentiated approach, based on current\nstrengths, is crucial for the region to stay relevant.\nPotential steps include the following:\n\n— Increase investment. In 2023, US private\n\ninvestments in AI reached $67 billion, compared\nwith just $11 billion in Europe.[60] This gap is\neven more striking when looking specifically\nat investments in gen AI. In 2023, US private'),
 Document(metadata={}, page_content='When it comes to unlocking the full potential\nof gen AI, Europe sits at a crossroads. Given\nthe technology’s novelty, the adoption race\nremains wide open. Europe has numerous\nopportunities to tactically reinforce its\npositions along the value chain while ensuring\nthat it guides gen AI development by ethical\nconsiderations. Policy makers must understand\nthat the stakes here are considerable and extend\nbeyond 

In [5]:
# Setting up the router

from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

# Schema for Router
class Router(BaseModel):
    """
    Route a user query to the most relevant
    path for response generation.

    """
    path: Literal["vectorstore","web_search"] = Field(...,
                                                      description = "Given a user question choose to route it to web search or vectorstore")


# LLM

llm = ChatOpenAI(model = 'gpt-3.5-turbo',temperature = 0)
structured_llm_router = llm.with_structured_output(Router)

# Prompt
system = """ You're an expert at routing a user question to a vector store or web search.
The vectorstore contains documents related to Europe's competitive position and opportunities in the 
generative AI value chain, covering sectors like AI semiconductor manufacturing, cloud infrastructure, 
and energy demands.

Use the vectorstore for questions around these topics.

Otherwise use Web Search

"""
route_prompt = ChatPromptTemplate.from_messages(
    [
        ('system',system),
        ('human',"{question}")
    ]
)

router_chain = route_prompt | structured_llm_router


# Testing
route = router_chain.invoke({'question': 'How much of Europe’s productivity growth can generative AI potentially contribute annually by 2030?'})

route_2 = router_chain.invoke({'question': 'What is capital of Finland?'})


print(route_2)




For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  exec(code_obj, self.user_global_ns, self.user_ns)


path='web_search'


In [6]:
# Retrieval Grader

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

# Schema for grading

class GradeDocuments(BaseModel):
    """
    Binary score for quanitifying the relevance of the retrieved documents

    """
    binary_score: str = Field(
        description = "Documents are relevant to the question,'yes' or 'no'"
    )

# LLM 

llm = ChatOpenAI(model = 'gpt-3.5-turbo',temperature = 0)

# Getting structured o/p from llm for grading
llm_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grading_prompt = ChatPromptTemplate.from_messages(
    [
        ('system',system),
        ('human',"Retrieved_document: \n\n {document} \n\n User question: {question}"),
    ]
)

grader_chain = grading_prompt | llm_grader

question = "Which country in Europe has a leading position in AI semiconductor equipment manufacturing?"

docs = retriever.get_relevant_documents(question)

# Firs retrieved chunk from the retrieval
doc_text = docs[2].page_content

print(grader_chain.invoke({'question': question,
                           'document':doc_text}))

  docs = retriever.get_relevant_documents(question)


binary_score='no'


In [7]:
# Generating response

from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt_template = """You are an assistant for question-answering tasks who answers questions based 
only on the context that are provided to you.
If you don't know the answer, just say that you don't know.
Follow these instructions strictly:

- Use three sentences maximum and keep the answer concise.
- Do not make up anything from your end, only refer to the context provided for answer generation
- If the context doesn't have required information to answer the question, respond with "I do not know"

question: {question}
search_results: {context} 
Answer:
"""

prompt = ChatPromptTemplate.from_template(prompt_template)

# llm 

llm = ChatOpenAI(model = 'gpt-3.5-turbo')

# combining the retrieved docs
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Chain
rag_chain = prompt | llm | StrOutputParser()

# Response

response = rag_chain.invoke({'context': docs,
                             'question': question})

print(response)


The Netherlands has a leading position in AI semiconductor equipment manufacturing with companies like ASML being market leaders in lithography machines. European companies also lead in other equipment segments such as atomic layer deposition and metal-organic chemical vapor deposition. However, the context does not specify a particular country in Europe having the leading position in AI semiconductor equipment manufacturing.


In [8]:
# Hallucinations measure

# Schema

class GradeHallucinations(BaseModel):
    """
    Binary Score for indicating if hallucinations
    are present in the generated answer

    """
    binary_score: str = Field(
        description = "Generated answer is grounded in the context provided, 'yes' or 'no'"
        )

# Hallucination grading LLM
llm = ChatOpenAI(model = 'gpt-3.5-turbo', temperature = 0)
llm_hallucinations = llm.with_structured_output(GradeHallucinations)

# Prompt
system = """You are a grader assessing whether an Generated answer is grounded in / supported by a set of retrieved context. \n 
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""

hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ('system',system),
        ('human',"Context: \n\n {documents} \n\n Generated answer: {response}"),
   
    ]
)

hallucination_grader_chain = hallucination_prompt | llm_hallucinations

hallucination_grader_chain.invoke({'documents':docs,
                                   'response': response})

GradeHallucinations(binary_score='yes')

In [9]:
### Answer Grader


# Data model
class GradeAnswer(BaseModel):
    """Binary score to assess answer addresses question."""

    binary_score: str = Field(
        description="Answer addresses the question, 'yes' or 'no'"
    )


# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature = 0)

structured_llm_grader = llm.with_structured_output(GradeAnswer)

# Prompt
system = """You are a grader assessing whether an answer addresses / resolves a question \n 
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {response}"),
    ]
)

answer_grader = answer_prompt | structured_llm_grader
answer_grader.invoke({"question": question, "response": response})

GradeAnswer(binary_score='no')

In [14]:
question

'Which country in Europe has a leading position in AI semiconductor equipment manufacturing?'

In [10]:
### Question Re-writer

# LLM
llm = ChatOpenAI(model="gpt-3.5-turbo")

# Prompt
system = """You are a question re-writer that converts an input question to a better version that is optimized \n 
     for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

'Which European country is known for its prominent role in manufacturing AI semiconductor equipment?'

In [24]:
# Tool response chain

from langchain_community.tools.tavily_search import TavilySearchResults
import time


web_search_tool = TavilySearchResults(k=3)

llm_web_search = llm.bind_tools([web_search_tool])


prompt_template = """You are an assistant for question-answering tasks who answers questions based 
on the search results context that are provided to you. 
Use the following pieces of search results to answer the question.
For every single search result, there is a url and content. Refer to the content for the response and also
mention the corresponding links of the results which you referred to. 
If you don't know the answer, just say that you don't know. 
Use three sentences maximum and keep the answer concise.
question: {question}
search_results: {context} 
Answer:
"""

prompt = ChatPromptTemplate.from_template(prompt_template)

# Post processing the search results
def format_search_results(search_results):
    return "\n\n".join(str(x) for x in search_results)


# Chain for the final response
web_search_chain = prompt | llm | StrOutputParser() 

# Function to get LLM response from web searched results
def tool_call(input:str):

    # Triggering the llm binded with tool on the input
    response = llm_web_search.invoke(input)

    # If the llm chooses to call the web search tool
    if response.tool_calls:
        print("---CALLING TAVILY TOOL---")

        search_results = web_search_tool.invoke(response.tool_calls[0]['args']['query'])

        # Post processing the search results
        context = format_search_results(search_results)
        
        print(f"The search results is : \n\n {context} \n")

        # Ingesting the search results to stitch up an answer
        chain_response = web_search_chain.invoke({'question': input,
                                 'context': context})
        
        print(f"Reponse: \n\n {chain_response}")

    # If the llm chooses not to call the web search tool
    else:
        print("---GENERIC LLM RESPONDS---")
        
        print(response.content)
        

In [26]:
tool_call(input = "what is capital of Finland?")

---CALLING TAVILY TOOL---
The search results is : 

 {'url': 'https://www.worldatlas.com/articles/what-is-the-capital-of-finland.html', 'content': 'Helsinki is the most populous city and the capital of Finland, located on the Gulf of Finland. It was founded in 1550 by Sweden and became the capital in 1812 under Russian rule.'}

{'url': 'https://simple.wikipedia.org/wiki/Helsinki', 'content': 'Helsinki is the capital city and the largest city of Finland, located on the coast of the Gulf of Finland. Learn about its history, geography, transport, culture and attractions from this simple and clear article.'}

{'url': 'https://www.wikiwand.com/en/articles/Helsinki', 'content': "Together with the cities of Espoo, Vantaa and Kauniainen—and surrounding commuter towns, [9] including the neighbouring municipality of Sipoo to the east [10] —Helsinki forms a metropolitan area.This area is often considered Finland's only metropolis and is the world's northernmost metropolitan area with over one mil

Up until here we have written code to facilitate:
1. Retrieved document grader
2. Hallucination grader
3. Router
4. Answer Grader
5. web search tool
6. LLM Response

Now, we step into building the workflow using LangGraph


In [27]:
# Defining the Graph State
from typing import List
from typing_extensions import TypedDict

class GraphState(TypedDict):
    """
    Custom state class for our adaptive rag workflow

    Dictionary with attributes:
    1. question: question
    2. response: LLM response
    3. documents: list of documents

    """

    question: str
    response: str
    documents: List[str]


In [28]:
# Nodes

# Retrieval Node

def retrieve(state):
    """
    Retrieve similar documents from vectorstore

    Arg: state(dict): Current graph state
    Returns: state(dict): New key added to state called documents

    """
    print("-----Retrieval------")
    
    # Fetch the question from the current state
    question = state['question']

    # Using the vector store as retriever to get similar documents 
    documents = retriever.get_relevant_documents(question)

    # Updating the state with documents key
    # Returning the updated state
    
    return {'documents':documents,
            'question': question}

# Response generation Node

def generate(state):

    """
    Arg: state(dict): current graph state
    returns: state(dict): response in the response key

    """
    print("-----Generating Response------")

    # Fteching the question and documents from the current state
    question = state['question']
    documents = state['documents']

    # Now using the rag chain to get response
    response = rag_chain.invoke({'context':documents,
                                 'question':question})
    
    # Returning the updated state
    return {'documents':documents,
            'question':question,
            'response':response}

# Grader Node

def grade_documents(state):
    """
    Args: state(dict): Current graph state
    Output: state(dict) with updated document key with filtered relevant

    """
    print("-----Checking Document Relevance-----")

    # Fetch the question and documents
    question = state['question']
    documents = state['documents']

    # Using grader chain to grade every retrieved document
    # Documents for which grader_chain responds with yes is appended 
    # to filtered documents

    filtered_documents = []

    # Iterating through the retrieved docs
    for d in documents:
        score = grader_chain.invoke(
            {'question':question,
             'document':d.page_content}
        )
        grade = score.binary_score

        if grade == 'yes':
            print("---Document Relevant!---")
            filtered_documents.append(d)
        else:
            print("---Document Irrelevant!---")

    return {'documents':filtered_documents,
            'question':question}

# Rewriter Node

def rewrite_question(state):

    """
    Args: state(dict): The current graph state
    Returns: Updates the question with the re-written question

    """
    print("----Rewriting Question-----")

    question = state['question']
    documents = state['documents']

    # Re-write question using rewriter chain
    new_query = question_rewriter.invoke({'question': question})

    # return the updated state with the rewritten question
    return {"documents": documents, "question": new_query}


In [None]:
# Defining the graph conditional edges

# Defining the conditional edge that checks availability of
# relevant docs to generate the response
def relevance_check(state): 
    """
    Decides whether to go response generation node 
    or rewrite the query for a better response 

    Args: state(dict): The current graph state
    Returns: next node to execute

    """
    print("---Assessing Graded documents---")
    
    filtered_documents = state['documents']

    # If filtered docs list is empty:
    if not filtered_documents:
        print("---No relevant document was retrieved--")
        print("---Redirecting for query translation---")
        return 'question_rewriter'
    else:
        print("---Relevant Documents found!---")
        print("---Moving ahead with response generation---")
        return "generate"

# Defining the conditional edge that checks for hallucination
def hallucination_check(state):
    """
    Checks whether the response is grounded in the relevant docs or not?
    If yes then proceed forward or else go for re generation of response

    Args: state(dict): The current graph state
    Returns: next node to execute

    """
    print("---Checking for HALLUCINATIONS!---")

    # Fetched the info required from current state
    question = state["question"]
    documents = state["documents"]
    response = state["response"]


    # Using the hallucination grader chain for checking for hallucination
    score = hallucination_grader_chain.invoke({'documents':documents,
                                               'response':response})
    
    grade = score.binary_score

    # Check Hallucinations
    if grade == 'yes':
        print("---NO Hallucinations!---")
        print("---Proceeding to Answer Grader---")
        score = answer_grader.invoke({"question":question,
                                     "response":response})
        grade = score.binary_score
        # If answer is useful
        if grade == 'yes':
            print("---Instrumental Answer---")
            return "useful"
        else:
            print("---Answer not valid---")
            return "not useful"
    else:
        print("---HALLUCINATIONS Found!!---")
        return "not supported"       
        

# Node for answer printing
def answer_output(state):

    response = state['response']

    print(f"Response: \n\n {response}")



Build Graph using the fragments we created

In [29]:
from langgraph.graph import END, StateGraph, START

# Instantiating the builder object
builder = StateGraph(GraphState)

# Define the nodes
builder.add_node('retrieve',retrieve)
builder.add_node('grade_documents',grade_documents)
builder.add_node('generate',generate)
builder.add_node('question_rewriter',question_rewriter)
builder.add_node('answer_output',answer_output)

# Define edges
builder.add_edge(START,'retrieve')
builder.add_edge('retrieve','grade_documents')
builder.add_conditional_edges(
    'grade_documents',
    relevance_check,
)

builder.add_edge('question_rewriter','retrieve')

builder.add_conditional_edges('generate',
                              hallucination_check,
                              # Mapping the outputs to the required nodes
                              {
                                  'not supported' : 'generate',
                                  'useful': 'answer_output',
                                  'not useful': 'question_rewriter'
                              }
                              )

builder.add_edge('answer_output',END)

# Compile
graph = builder.compile()

In [30]:
from pprint import pprint

# Run
inputs = {"question": "Which country in Europe has a leading position in AI semiconductor equipment manufacturing?"}
response = graph.invoke(inputs)



-----Retrieval------
-----Checking Document Relevance-----
---Document Irrelevant!---
---Document Relevant!---
---Document Irrelevant!---
---Document Irrelevant!---
---Assessing Graded documents---
---Relevant Documents found!---
---Moving ahead with response generation---
-----Generating Response------
---Checking for HALLUCINATIONS!---
---NO Hallucinations!---
---Proceeding to Answer Grader---
---Instrumental Answer---
Response: 

 The Netherlands has a leading position in AI semiconductor equipment manufacturing.


In [31]:
# Stitching together graph and the chain

def app(input:str):
    question = {'question':input}
    route = router_chain.invoke(question)
    path = route.path

    if path == "vectorstore":
        print("---ROUTED TO VECTORSTORE---")
        graph.invoke(question)
    else:
        print("---ROUTED TO WEB TOOL---")
        tool_call(input)
        

In [34]:
app("Which country in Europe has a leading position in AI semiconductor equipment manufacturing?")


---ROUTED TO VECTORSTORE---
-----Retrieval------
-----Checking Document Relevance-----
---Document Irrelevant!---
---Document Relevant!---
---Document Irrelevant!---
---Document Irrelevant!---
---Assessing Graded documents---
---Relevant Documents found!---
---Moving ahead with response generation---
-----Generating Response------
---Checking for HALLUCINATIONS!---
---NO Hallucinations!---
---Proceeding to Answer Grader---
---Instrumental Answer---
Response: 

 The Netherlands has a leading position in AI semiconductor equipment manufacturing, with companies like ASML being market leaders in lithography machines. Other European companies like ASM International and AIXTRON also lead in equipment segments related to semiconductor manufacturing. European companies are less present in certain niches like dry etchers and dicing machines.
