### 1: Library Imports and Environment Setup

In [1]:
import os
import uuid
from typing import List, Dict, TypedDict
from dotenv import load_dotenv
from IPython.display import Image, display
from langgraph.graph import START, END, StateGraph
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader, PyPDFLoader
from langchain_community.vectorstores import SKLearnVectorStore
from langchain_openai import OpenAIEmbeddings
from langchain_core.tools import tool
from langchain.schema import Document
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_ollama import ChatOllama
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langsmith import Client
from langchain import hub
from langchain_openai import ChatOpenAI
from langsmith.schemas import Example, Run
from langsmith.evaluation import evaluate

# Load environment variables
load_dotenv()

# API keys
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
LANGCHAIN_API_KEY = os.getenv("LANGCHAIN_API_KEY")

# Set the current working directory and file path
current_dir = os.getcwd()  
file_path = os.path.join(current_dir, "data", "aws-overview.pdf")

USER_AGENT environment variable not set, consider setting it to identify your requests.


### 2: Data Loading and Preprocessing

In [2]:
def load_and_process_data(file_path):
    loader = PyPDFLoader(file_path)
    documents = loader.load()

    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=250, chunk_overlap=0
    )

    doc_splits = text_splitter.split_documents(documents)

    vectorstore = SKLearnVectorStore.from_documents(
        documents=doc_splits,
        embedding=OpenAIEmbeddings(),
    )
    retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
    
    return retriever

### 3: Model Initialization and Configuration

In [3]:
def initialize_models():
    web_search_tool = TavilySearchResults()

    llm = ChatOllama(model="llama3.1", temperature=0)

    prompt = PromptTemplate(
        template="""You are an assistant for question-answering tasks. 
        Use the following documents to answer the question. 
        If you don't know the answer, just say that you don't know. 
        Use three sentences maximum and keep the answer concise:
        Question: {question} 
        Documents: {documents} 
        Answer: 
        """,
        input_variables=["question", "documents"],
    )

    rag_chain = prompt | llm | StrOutputParser()

    llm_json = ChatOllama(model="llama3.1", format="json", temperature=0)

    grader_prompt = PromptTemplate(
        template="""You are a grader assessing relevance of a retrieved document to a user question.
        Here is the retrieved document: \n\n {document} \n\n
        Here is the user question: {question} \n
        If the document contains keywords related to the user question, grade it as relevant.
        It does not need to be a stringent test. The goal is to filter out erroneous retrievals.
        Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.
        Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.""",
        input_variables=["question", "document"],
    )

    retrieval_grader = grader_prompt | llm_json | JsonOutputParser()
    
    return web_search_tool, rag_chain, retrieval_grader


### 4: Corrective RAG Pipeline Implementation

In [4]:

class GraphState(TypedDict):
    question: str
    generation: str
    search: str
    documents: List[str]
    steps: List[str]

def retrieve(state):
    question = state["question"]
    documents = retriever.invoke(question)
    steps = state["steps"]
    steps.append("retrieve_documents")
    return {"documents": documents, "question": question, "steps": steps}

def grade_documents(state):
    question = state["question"]
    documents = state["documents"]
    steps = state["steps"]
    steps.append("grade_document_retrieval")
    filtered_docs = []
    search = "No"
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score["score"]
        if grade == "yes":
            filtered_docs.append(d)
        else:
            search = "Yes"
    return {"documents": filtered_docs, "question": question, "search": search, "steps": steps}

def decide_to_generate(state):
    search = state["search"]
    return "search" if search == "Yes" else "generate"

def web_search(state):
    question = state["question"]
    documents = state.get("documents", [])
    steps = state["steps"]
    steps.append("web_search")
    web_results = web_search_tool.invoke({"query": question})
    documents.extend([Document(page_content=d["content"], metadata={"url": d["url"]}) for d in web_results])
    return {"documents": documents, "question": question, "steps": steps}

def generate(state):
    question = state["question"]
    documents = state["documents"]
    generation = rag_chain.invoke({"documents": documents, "question": question})
    steps = state["steps"]
    steps.append("generate_answer")
    return {"documents": documents, "question": question, "generation": generation, "steps": steps}

def build_graph():
    workflow = StateGraph(GraphState)
    workflow.add_node("retrieve", retrieve)
    workflow.add_node("grade_documents", grade_documents)
    workflow.add_node("generate", generate)
    workflow.add_node("web_search", web_search)

    workflow.set_entry_point("retrieve")
    workflow.add_edge("retrieve", "grade_documents")
    workflow.add_conditional_edges(
        "grade_documents",
        decide_to_generate,
        {"search": "web_search", "generate": "generate"},
    )
    workflow.add_edge("web_search", "generate")
    workflow.add_edge("generate", END)

    return workflow.compile()

### 5: Example Usage of the Pipeline

In [5]:
def predict_custom_agent_answer(example: dict):
    config = {"configurable": {"thread_id": str(uuid.uuid4())}}
    state_dict = custom_graph.invoke({"question": example["input"], "steps": []}, config)
    return {"response": state_dict["generation"], "steps": state_dict["steps"]}

### 6: LangSmith Evaluation Setup

In [6]:
def setup_langsmith_evaluation():
    client = Client()

    examples = [
        ("What is AWS EC2?", "AWS EC2 (Elastic Compute Cloud) is a web service that provides resizable compute capacity in the cloud. It allows users to rent virtual servers (instances) to run applications, offering scalability and flexibility. EC2 is a core component of Amazon's cloud computing platform, enabling businesses to quickly deploy and manage computing resources without investing in physical hardware."),
        ("What AWS services can I use to host a Python web app?", "For hosting a Python web app on AWS, you can use AWS Elastic Beanstalk for a fully managed solution, EC2 for more control over your environment, or Lambda with API Gateway for a serverless approach. The choice depends on your app's requirements, desired level of control, and scalability needs.")
    ]

    dataset_name = "Corrective RAG Agent Testing"
    if not client.has_dataset(dataset_name=dataset_name):
        dataset = client.create_dataset(dataset_name=dataset_name)
        inputs, outputs = zip(*[({
            "input": text
        }, {
            "output": label
        }) for text, label in examples])
        client.create_examples(inputs=inputs, outputs=outputs, dataset_id=dataset.id)
    
    return dataset_name

def answer_evaluator(run, example) -> dict:
    """
    A simple evaluator for RAG answer accuracy
    """
    input_question = example.inputs["input"]
    reference = example.outputs["output"]
    prediction = run.outputs["response"]

    llm = ChatOpenAI(model="gpt-4", temperature=0)
    grade_prompt_answer_accuracy = hub.pull("langchain-ai/rag-answer-vs-reference")
    answer_grader = grade_prompt_answer_accuracy | llm

    score = answer_grader.invoke({
        "question": input_question,
        "correct_answer": reference,
        "student_answer": prediction,
    })
    score = score["Score"]
    return {"key": "answer_v_reference_score", "score": score}

def check_trajectory_custom(root_run: Run, example: Example) -> dict:
    """
    Check if all expected tools are called in exact order and without any additional tool calls.
    """
    expected_trajectory_1 = [
        "retrieve_documents",
        "grade_document_retrieval",
        "web_search",
        "generate_answer",
    ]
    expected_trajectory_2 = [
        "retrieve_documents",
        "grade_document_retrieval",
        "generate_answer",
    ]
    
    tool_calls = root_run.outputs["steps"]
    print(f"Tool calls custom agent: {tool_calls}")
    if tool_calls == expected_trajectory_1 or tool_calls == expected_trajectory_2:
        score = 1
    else:
        score = 0

    return {"score": int(score), "key": "tool_calls_in_exact_order"}

def run_evaluation(dataset_name):
    model_tested = "llama3.1"
    metadata = "CRAG, llama3.1"
    experiment_prefix = f"custom-agent-{model_tested}"
    
    experiment_results = evaluate(
        predict_custom_agent_answer,
        data=dataset_name,
        evaluators=[answer_evaluator, check_trajectory_custom],
        experiment_prefix=experiment_prefix + "-answer-and-tool-use",
        num_repetitions=3,
        max_concurrency=1,
        metadata={"version": metadata},
    )
    
    return experiment_results

### 7: Main Execution

In [11]:
if __name__ == "__main__":
    # Step 1: Initialize the system
    print("Initializing the system...")
    retriever = load_and_process_data(file_path)
    web_search_tool, rag_chain, retrieval_grader = initialize_models()

    # Step 2: Build the graph
    print("Building the graph...")
    custom_graph = build_graph()

    # Step 3: Run example queries
    print("\nRunning example queries:")
    examples = [
        {"input": "What AWS services can I use to host a Python web app?"},
        {"input": "What is AWS EC2?"}
    ]
    
    for i, example in enumerate(examples, 1):
        print(f"\nExample {i}:")
        response = predict_custom_agent_answer(example)
        print(f"Question: {example['input']}")
        print(f"Answer: {response['response']}")
        print(f"Steps: {response['steps']}")

    # Step 4: Set up and run LangSmith evaluation
    # print("\nSetting up LangSmith evaluation...")
    # dataset_name = setup_langsmith_evaluation()
    
    # print("\nRunning evaluation...")
    # evaluation_results = run_evaluation(dataset_name)
    
    # print("\nEvaluation complete. Results:")
    # print(evaluation_results)

    # print("\nYou can view detailed results in the LangSmith dashboard.")

Initializing the system...
Building the graph...

Running example queries:

Example 1:
Question: What AWS services can I use to host a Python web app?
Answer: You can use the following AWS services to host a Python web app:

* Amazon Elastic Compute Cloud (EC2)
* AWS Elastic Beanstalk
* AWS App Runner
* CodePipeline and GitHub for CI/CD pipeline automation.

These services provide scalable, secure, and managed infrastructure for deploying and running your Python application.
Steps: ['retrieve_documents', 'grade_document_retrieval', 'web_search', 'generate_answer']

Example 2:
Question: What is AWS EC2?
Answer: Amazon EC2 (Elastic Compute Cloud) is a web service that provides secure, resizable compute capacity in the cloud. It allows developers to obtain and configure capacity with minimal friction, providing complete control over computing resources. Amazon EC2 reduces the time required to obtain and boot new server instances to minutes, allowing for quick scaling of capacity as needed