# ThemeParkAssist
### Local LangGraph Retrieval-Augmented Generation (RAG) Agent with Llama 3 and Theme Park Journey Orchestration

This notebook demonstrates a Retrieval-Augmented Generation (RAG) agent setup using a local instance of LangGraph and the Llama 3 model on data sourced using Selenium, as well as third-party API search (Tavily) for theme park visit journey orchestration. The agent processes questions from theme-park visitors, retrieves relevant documents, and generates appropriate and personalized recommendations. The architecture relies on applying several advanced concepts such as reflection, planning, and adaptive routing to empower the journey routing and event recommendations unique to each park visit. The system is designed to run fully on local infrastructure, supporting use cases where privacy or custom agents are required for in-depth question answering, journey orchestration, and information retrieval tasks on private data.

This notebook builds upon the work presented in the [LangGraph RAG Agent notebook](https://colab.research.google.com/github/meta-llama/llama-recipes/blob/main/recipes/use_cases/agents/langchain/langgraph-rag-agent-local.ipynb), adapting it to a more complete multi-step query-routing system where data is often collected or published at different intervals. 

The architecture incorporates the following strategies from recent research:

- **Adaptive Routing** ([Adaptive RAG](https://arxiv.org/abs/2403.14403)): Efficiently routes questions to the most appropriate retrieval mechanism.
- **Fallback Strategy** ([Corrective RAG](https://arxiv.org/pdf/2401.15884.pdf)): Leverages fallback web searches when retrieved documents are not directly relevant.
- **Self-Correction** ([Self-RAG](https://arxiv.org/abs/2310.11511)): Mitigates hallucinations and incorrect answers by re-evaluating generations.

## Key Concepts

- **Reflection**: The agent can reflect on its own retrieval results from theme park related information, deciding whether its generation addresses the question effectively.
- **Planning**: Control flow is planned through a LangGraph state machine that handles query routing and tool usage.
- **Tool Use**: The agent can invoke external tools (e.g., web search) when retrieval results fall short of addressing the query.

## Overview of Technologies

This setup uses local models, including:

- **Automated Data Collection**: [Selenium](https://www.selenium.dev), web automation project.
- **LLM**: [Llama 3](https://ollama.ai/library/llama3), provided by the Ollama platform.
- **Embeddings**: [GPT4All Embeddings](https://blog.nomic.ai/posts/nomic-embed-text-v1) for efficient document retrieval.
- **Search API**: Integration with [Tavily](https://tavily.com/#api) for fallback searches.

The notebook walks through setting up the local environment, defining the state machine, and testing the agent with user questions to illustrate the functionality and flexibility of this RAG agent architecture.

The notebook extends the use case by including personalized journey recommendations as a park visitor progresses through the park, with their events and actions, location, and sentiment used to help facilitate a next-best-event. 

To Do: Create endpoint to mock a lat/lon for a park visitor, create mock events/actions, and evaluate ways of generating meta-data from extracted content going into document/vectore/graph stores, and create a tool that books a table for a restaurant at the park and adds in any personalization requests/allergies.


In [1]:
# imports needed for data collection
from datetime import datetime
import dateutil.relativedelta
import os
import re
import json
import time
import urllib.request, json 
import sqlalchemy
from IPython.display import clear_output
import pandas as pd
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, ElementNotInteractableException, TimeoutException
from PIL import Image

from langchain.chains import create_extraction_chain


import ast  # for converting embeddings saved as strings back to arrays

import pandas as pd  # for storing text and embeddings data
import tiktoken  # for counting tokens
from scipy import spatial  # for calculating vector similarities for search

from openai import OpenAI
from pinecone import Pinecone, ServerlessSpec
from tqdm.auto import tqdm
from time import sleep
from tqdm.autonotebook import tqdm

from typing import Optional

from langchain.chains import create_extraction_chain
from pydantic import BaseModel, Field, ValidationError
from langchain.prompts.chat import (
    ChatPromptTemplate,
    HumanMessagePromptTemplate,
    SystemMessagePromptTemplate,
)
from langchain.schema import HumanMessage, SystemMessage, Document
from langchain_openai import ChatOpenAI

from dotenv import load_dotenv

load_dotenv()

OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')



In [2]:
def init_chrome_browser(url):
  download_path="/tmp/downloads"
  # chrome_driver_path="/tmp/chromedriver/chromedriver-linux64/chromedriver"
  chrome_driver_path="/opt/homebrew/bin/chromedriver"
  options = Options()
  prefs = {'download.default_directory' : download_path, 
           'profile.default_content_setting_values.automatic_downloads': 1, 
           "download.prompt_for_download": False, 
           "download.directory_upgrade": True,
           "safebrowsing.enabled": True,
           "translate_whitelists": {"vi":"en"},
           "translate":{"enabled":"true"}}
  options.add_experimental_option('prefs', prefs)
  options.add_argument('--no-sandbox')
  # options.add_argument('--headless')    # wont work without this feature in databricks can't display browser
  options.add_argument('--disable-dev-shm-usage')
  options.add_argument('--start-maximized')
  options.add_argument('window-size=2000,4000')
  options.add_argument('--ignore-certificate-errors')
  options.add_argument('--ignore-ssl-errors')
  options.add_argument('--lang=en')
  options.add_experimental_option('excludeSwitches', ['enable-logging'])
  print(f"{datetime.now()}    Launching Chrome...")
  browser = webdriver.Chrome(service=Service(chrome_driver_path), options=options)
  print(f"{datetime.now()}    Chrome launched.")
  browser.get(url)
  print(f"{datetime.now()}    Browser ready to use.")
  return browser

In [3]:
# Function to take a screenshot
def take_screenshot(filename):
    driver.save_screenshot(filename)  # Saves screenshot as an image file

In [4]:
print("Starting chrome browser.")
url = "https://disneyland.disney.go.com"
driver = init_chrome_browser(
    url=url
)
print(f"Starting chrome browser: URL initialized. {url}")
time.sleep(10)
take_screenshot(f'disneyhomepageX.png')

Starting chrome browser.
2024-09-30 11:38:22.305619    Launching Chrome...
2024-09-30 11:38:23.955304    Chrome launched.
2024-09-30 11:38:32.593240    Browser ready to use.
Starting chrome browser: URL initialized. https://disneyland.disney.go.com


In [5]:
# List of URLs to process
urls = [
    "https://disneyland.disney.go.com/",
    "https://disneyland.disney.go.com/experience-updates/",
    "https://disneyland.disney.go.com/lightning-lane-passes/",
    "https://disneyland.disney.go.com/hotels/",
    "https://disneyland.disney.go.com/admission/tickets/",
]

# Initialize an empty list to store documents
docs_list = []

# Process through multiple URLs
for url in urls:
    print(f"Starting chrome browser for URL: {url}")
    driver = init_chrome_browser(url)
    
    # Give time for the page to load
    ### Update to expected wait conditions
    time.sleep(10)
    
    # Take screenshot for reference (optional)
    screenshot_name = f'screenshot_{url.split("/")[-2]}.png'
    take_screenshot(screenshot_name)
    print(f"Screenshot saved as {screenshot_name}")
    
    # Get page source and parse with BeautifulSoup
    page_source = driver.page_source
    soup = BeautifulSoup(page_source, "html.parser")
    
    # Extract all the text from the page
    all_text = soup.get_text(separator=" ", strip=True)
    print(f"Extracted text from {url}: {all_text[:200]}...")  # Print the first 200 characters of extracted text

    ####  
    #### To Do: Generate summary and capture core details from the source text, and meta to store as vectors, use a pydantic model,
    ####         and collect the URLs from the page_source that might be relevant to the chunk - chunking plays a role, as well as result set
    ####
    
    
    # Append the text and metadata to the docs_list
    docs_list.append(Document(page_content=all_text, metadata={"source": url}))
    
    # Close the browser after processing the URL
    driver.quit()

# process the extracted documents (chunking, vector store, etc.)
# split the text using LangChain's text splitter:
from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=400, chunk_overlap=50
)
doc_splits = text_splitter.split_documents(docs_list)
print(f"Total number of chunks after splitting: {len(doc_splits)}")

# Optionally, add the documents to Chroma vector store
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings

vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma-2",
    embedding=HuggingFaceEmbeddings(),
)

collection = vectorstore._client.get_collection("rag-chroma-2")
print(f"Number of vectors in the collection: {collection.count()}")

Starting chrome browser for URL: https://disneyland.disney.go.com/
2024-09-30 11:38:43.249501    Launching Chrome...
2024-09-30 11:38:43.805535    Chrome launched.
2024-09-30 11:38:46.736740    Browser ready to use.
Screenshot saved as screenshot_disneyland.disney.go.com.png
Extracted text from https://disneyland.disney.go.com/: Disneyland Resort in Anaheim, California – Official Site   Visit Disney.com Skip Navigation Log In or Create Account United States (English) Help Search Navigation Links Tickets & Parks Tickets & Pa...
Starting chrome browser for URL: https://disneyland.disney.go.com/experience-updates/
2024-09-30 11:38:57.699257    Launching Chrome...
2024-09-30 11:38:58.217895    Chrome launched.
2024-09-30 11:39:01.213680    Browser ready to use.
Screenshot saved as screenshot_experience-updates.png
Extracted text from https://disneyland.disney.go.com/experience-updates/: What To Know Before You Go | Disneyland Resort Visit Disney.com Skip Navigation Log In or Create Accou



Number of vectors in the collection: 33


In [6]:
### Retrieval Grader 
### LLM

local_llm = 'llama3'

retriever = vectorstore.as_retriever()

from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
    template="""You are a grader assessing relevance 
    of a retrieved document to a user question. If the document contains keywords related to the user question, 
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. 
    
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.
    Provide the binary score as a JSON with a single key 'score' and no premable or explaination.
     
    Here is the retrieved document: 
    {document}
    
    Here is the user question: 
    {question}
    """,
    input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()
question = "How much are tickets to Disneyland?"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

{'score': 'yes'}


In [7]:
### Generate

from langchain.prompts import PromptTemplate
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = PromptTemplate(
    template="""You are an assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
    Use three sentences maximum and keep the answer concise:
    Question: {question} 
    Context: {context} 
    Answer: 
    """,
    input_variables=["question", "document"],
)

llm = ChatOllama(model=local_llm, temperature=0)

# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
question = "What ride should I go on at Disneyland?"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

I'd be happy to help! Based on the provided context, it seems that Disneyland offers Lightning Lane passes which allow you to skip standby lines for select attractions. You can choose from a variety of eligible attractions in both parks or opt for a single pass to ride Star Wars: Rise of the Resistance or Radiator Springs Racers.


In [8]:
### Hallucination Grader 

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Prompt
prompt = PromptTemplate(
    template="""You are a grader assessing whether an answer is grounded in / supported by a set of facts. \n 
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}
    Give a binary score 'yes' or 'no' score to indicate whether the answer is grounded in / supported by a set of facts. \n
    Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

In [9]:
### Answer Grader 

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# Prompt
prompt = PromptTemplate(
    template="""You are a grader assessing whether an 
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
     
    Here is the answer:
    {generation} 

    Here is the question: {question}
    """,
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question,"generation": generation})

{'score': 'yes'}

In [10]:
### Router

from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

# LLM
llm = ChatOllama(model=local_llm, format="json", temperature=0)

prompt = PromptTemplate(
    template="""You are an expert at routing a 
    user question to a vectorstore or web search. Use the vectorstore for questions on Disneyland general overview and fast pass questions, park tickets and hotels. You do not need to be stringent with the keywords 
    in the question related to these topics. Otherwise, use web-search. Give a binary choice 'web_search' 
    or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and 
    no premable or explaination. 
    
    Question to route: 
    {question}""",
    input_variables=["question"],
)

question_router = prompt | llm | JsonOutputParser()
question = "What are some good places to visit in Southern California?"
docs = retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
print(question_router.invoke({"question": question}))

  docs = retriever.get_relevant_documents(question)


{'datasource': 'web_search'}


In [11]:
### Search
import os

# os.environ['LANGCHAIN_TRACING_V2'] = 'true'
# os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
# os.environ['LANGCHAIN_API_KEY'] = 'LANGCHAIN_API_KEY'

TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
from langchain_community.tools.tavily_search import TavilySearchResults

web_search_tool = TavilySearchResults(k=5)

In [12]:
from typing_extensions import TypedDict
from typing import List

### State

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        web_search: whether to add search
        documents: list of documents 
    """
    question : str
    generation : str
    web_search : str
    documents : List[str]

from langchain.schema import Document

### Nodes

def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    
    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    
    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score['score']
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}
    
def web_search(state):
    """
    Web search based based on the question

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Appended web results to documents
    """

    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
        
    print("Documents:")
    print(documents[:850])
    print("Question:")
    print(question)
    return {"documents": documents, "question": question}

### Conditional edge

def route_question(state):
    """
    Route question to web search or RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})  
    print(source)
    print(source['datasource'])
    if source['datasource'] == 'web_search':
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source['datasource'] == 'vectorstore':
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"

def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]

    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

### Conditional edge

def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question,"generation": generation})
        grade = score['score']
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generate

<langgraph.graph.state.StateGraph at 0x32b86a150>

In [13]:
# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch",
        "vectorstore": "retrieve",
    },
)

workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)

<langgraph.graph.state.StateGraph at 0x32b86a150>

In [15]:
# Compile
app = workflow.compile()

# Test
from pprint import pprint
inputs = {"question": "Does Disney still have fast passes?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
Does Disney still have fast passes?
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('Yes, Disney still has FastPasses. They have been rebranded as "Lightning '
 'Lane" passes and offer a way to reserve a spot in line for popular '
 'attractions at Disneyland Resort.')
