In [1]:
import os
from langchain_groq import ChatGroq
from langchain_tavily import TavilySearch
from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.tools import Tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
import requests 
from bs4 import BeautifulSoup
import re

In [2]:
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage


In [None]:
os.environ['GROQ_API_KEY'] = 'entergroqapikey'
os.environ['TAVILY_API_KEY'] = 'entertavilyapikey'
os.environ['USER_AGENT'] = 'StateGraphRAGAgent/1.0'

In [4]:
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    query: str
    search_results: dict
    current_search_tool: str
    rag_results: str
    final_answer: str

In [5]:
class RAGpipeline:
    def __init__(self):
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-MiniLM-L6-v2"
            )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=100
        )

        self.vector_store = None

    def add_text_documents(self, text: str):
        from langchain.schema import Document
        doc = Document(page_content=text)
        return self.add_documents([doc])

    def add_documents(self, docs):
        chunks = self.text_splitter.split_documents(docs)

        if not self.vector_store:
            self.vector_store = FAISS.from_documents(chunks, self.embeddings)
        else:
            new_vector_store = FAISS.from_documents(chunks, self.embeddings)
            self.vector_store.merge_from(new_vector_store)

    def retrieve(self, query: str, k: int = 5, filter_type: str = None):
        if not self.vector_store:
            return ""
        docs = self.vector_store.similarity_search(query, k=k)
        return "\n\n".join([doc.page_content for doc in docs])

rag_pipeline = RAGpipeline()


In [6]:
model = ChatGroq(
    model_name="llama-3.1-8b-instant",
    temperature=0,
    api_key=os.getenv('GROQ_API_KEY')
)

tavily_search = TavilySearch(max_results=5)
duckduckgo_search = DuckDuckGoSearchRun(max_results=5)
wikipedia_search = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper(top_k_results=3))

In [7]:
def extract_query(state):
    last_message = state["messages"][-1]
    if isinstance(last_message, HumanMessage):
        state["query"] = last_message.content
    return state

In [8]:
def Tavily_search(state):
    try:
        results = tavily_search.run(state["query"])
        state["search_results"]["tavily"] = results
        state["current_search_tool"] = "tavily"
        print(f"Tavily search successful for: {state['query']}")
    except Exception as e:
        state["search_results"]["tavily"] = None
        state["current_search_tool"] = "tavily"
        print(f"Tavily search failed: {str(e)}")
    return state

In [9]:
def Duckduckgo_search(state):
    try:
        results = duckduckgo_search.run(state["query"])
        state["search_results"]["duckduckgo"] = results
        state["current_search_tool"] = "duckduckgo"
        print(f"DuckDuckGo search successful for: {state['query']}")
    except Exception as e:
        state["search_results"]["duckduckgo"] = None
        state["current_search_tool"] = "duckduckgo"
        print(f"DuckDuckGo search failed: {str(e)}")
    return state

In [10]:
def Wikipedia_search(state):
    try:
        results = wikipedia_search.run(state["query"])
        state["search_results"]["wikipedia"] = results
        state["current_search_tool"] = "wikipedia"
        print(f"Wikipedia search successful for: {state['query']}")
    except Exception as e:
        state["search_results"]["wikipedia"] = None
        state["current_search_tool"] = "wikipedia"
        print(f"Wikipedia search failed: {str(e)}")
    return state

In [11]:
def check_search_success(state):
    current_tool = state["current_search_tool"]
    results = state["search_results"].get(current_tool)
        
    if results:
        return "success"
    return "failure"

In [12]:
def scrape_urls_from_results(search_results, max_sites=5):
    scraped_content = []
    urls = []
        
    for tool_name, results in search_results.items():
        if results:
            if isinstance(results, dict) and 'results' in results:
                for item in results['results']:
                    if 'url' in item:
                        urls.append(item['url'])

            elif isinstance(results, str):
                url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
                found_urls = re.findall(url_pattern, results)
                for url in found_urls:
                    urls.append(url)
    
    
    if not urls:
        print("❌ No URLs found to scrape")
        return scraped_content
    
    for i, url in enumerate(urls[:max_sites]):
        try:
            
            response = requests.get(url, timeout=15, headers={
                'User-Agent': 'StateGraphRAGAgent/1.0'
            })
            if response.status_code == 200:
                soup = BeautifulSoup(response.content, 'html.parser')
                
                for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):
                    element.decompose()
                
                text = soup.get_text(separator=' ', strip=True)
                
                if len(text) > 50000: 
                    text = text[:50000] + "..."
                
                if text and len(text) > 100:
                    scraped_content.append(f"Content from {url}:\n{text}")
            else:
                print(f"❌ HTTP {response.status_code} from {url}")
                
        except Exception as e:
            print(f"❌ Failed to scrape {url}: {str(e)}")
            continue
    
    return scraped_content

In [13]:
def store_in_rag(state):
    search_data = []
        
    for tool_name, results in state["search_results"].items():
        if results:
            if isinstance(results, dict) and 'results' in results:
                for item in results['results']:
                    content = item.get('content', '')
                    title = item.get('title', '')
                    url = item.get('url', '')
                    
                    if content:
                        formatted_content = f"Title: {title}\nURL: {url}\nContent: {content}"
                        search_data.append(f"Source ({tool_name}): {formatted_content}")
                        
            elif isinstance(results, str):
                search_data.append(f"Source ({tool_name}): {results}")
                
    if search_data:
        combined_text = "\n\n".join(search_data)
        rag_pipeline.add_text_documents(combined_text)
    
    try:
        scraped_content = scrape_urls_from_results(state["search_results"], max_sites=5)
        if scraped_content:
            for content in scraped_content:
                rag_pipeline.add_text_documents(content)
        else:
            print("⚠️ No websites were scraped")
    except Exception as e:
        print(f"❌ Scraping error: {e}")

    return state

In [14]:
def retrieve_and_respond(state: AgentState):
    query = state["query"]
    rag_results = rag_pipeline.retrieve(query, k=5)
    
    if rag_results:
        prompt = f"Answer this question based on the information: {query}\n\nInformation: {rag_results}"
        try:
            response = model.invoke([HumanMessage(content=prompt)])
            final_answer = response.content
        except Exception as e:
            final_answer = f"Error: {e}"
    else:
        final_answer = "No information found."
    
    state["rag_results"] = rag_results
    state["final_answer"] = final_answer
    return state

In [15]:
workflow = StateGraph(AgentState)
workflow.add_node("extract_query", extract_query)
workflow.add_node("tavily_search", Tavily_search)
workflow.add_node("duckduckgo_search", Duckduckgo_search)
workflow.add_node("wikipedia_search", Wikipedia_search)
workflow.add_node("store_in_rag", store_in_rag)
workflow.add_node("retrieve_and_respond", retrieve_and_respond)

workflow.set_entry_point("extract_query")
        
workflow.add_edge("extract_query", "tavily_search")
workflow.add_conditional_edges(
            "tavily_search",
            check_search_success,
            {
                "success": "store_in_rag",
                "failure": "duckduckgo_search"
            }
        )
workflow.add_conditional_edges(
            "duckduckgo_search",
            check_search_success,
            {
                "success": "store_in_rag",
                "failure": "wikipedia_search"
            }
        )
workflow.add_conditional_edges(
            "wikipedia_search",
            check_search_success,
            {
                "success": "store_in_rag",
                "failure": "retrieve_and_respond" 
            }
        )
workflow.add_edge("store_in_rag", "retrieve_and_respond")
workflow.add_edge("retrieve_and_respond", END)

app = workflow.compile()

In [16]:
def ask_question(question):
    initial_state = {
        "messages": [HumanMessage(content=question)],
        "query": "",
        "search_results": {},
        "current_search_tool": "",
        "rag_results": "",
        "final_answer": ""
    }
    
    result = app.invoke(initial_state)
    return result["final_answer"]

In [21]:
question = "Who won FIFA World Cup 2022?"
print(f"Question: {question}")
print(f"Answer: {ask_question(question)}")

Question: Who won FIFA World Cup 2022?
Tavily search successful for: Who won FIFA World Cup 2022?
Answer: Argentina won the 2022 FIFA World Cup, defeating France in the final match. This was Argentina's third World Cup victory.
