In [None]:
!pip install -q langchain langchain-groq langchain-huggingface langchain-chroma langchain-community langchain_tavily
!pip install -q chromadb sentence-transformers requests wikipedia-api python-dotenv wikipedia
!pip install -qU beautifulsoup4 requests chromadb arxiv

In [None]:
!pip install langgraph

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
import os
import asyncio
from typing import TypedDict, Annotated, List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

In [None]:
from langchain_core.documents import Document
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool, StructuredTool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_groq import ChatGroq
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.tools import WikipediaQueryRun, ArxivQueryRun
from langchain_community.utilities import WikipediaAPIWrapper, ArxivAPIWrapper
from langchain_tavily import TavilySearch

In [None]:
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode

In [None]:
import os
from google.colab import userdata
os.environ["GROQ_API_KEY"] = userdata.get('GROQ_API_KEY')
os.environ["TAVILY_API_KEY"] = userdata.get('TAVILY_API_KEY')
os.environ["HF_TOKEN"] = userdata.get('HF_TOKEN')

In [None]:
from langchain_groq import ChatGroq

llm = ChatGroq(
    model="openai/gpt-oss-120b",
    temperature=0.3,
    max_tokens=2048
)

In [None]:
from langchain_huggingface import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    encode_kwargs={'normalize_embeddings': True}
)

In [None]:
vectorstore = Chroma(
    collection_name="parallel_rag",
    embedding_function=embeddings,
    persist_directory="./chroma_db"
)

In [None]:
@tool
def web_search_tool(query: str) -> str:
    """Search the web for current information using Tavily."""
    tavily_search = TavilySearch(
        max_results=3,
        topic="general",
        include_raw_content=True,
        search_depth="advanced"
    )
    results = tavily_search.invoke({"query": query})

    formatted_results = []
    for result in results.get('results', []):
        formatted_results.append(f"Title: {result['title']}\nURL: {result['url']}\nContent: {result['content'][:500]}...")

    return "\n\n".join(formatted_results)

In [None]:
@tool
def wikipedia_search_tool(query: str) -> str:
    """Search Wikipedia for encyclopedic information."""
    wikipedia = WikipediaQueryRun(
        api_wrapper=WikipediaAPIWrapper(
            top_k_results=2,
            doc_content_chars_max=1000
        )
    )
    return wikipedia.invoke({"query": query})

In [None]:
@tool
def arxiv_search_tool(query: str) -> str:
    """Search arXiv for academic papers and research."""
    arxiv = ArxivQueryRun(
        api_wrapper=ArxivAPIWrapper(
            top_k_results=2,
            doc_content_chars_max=1000
        )
    )
    return arxiv.invoke({"query": query})

In [None]:
@tool
def vector_search_tool(query: str) -> str:
    """Search the vector database for relevant documents."""
    if vectorstore._collection.count() == 0:
        return "Vector database is empty. Please load documents first."

    docs = vectorstore.similarity_search(query, k=3)
    results = []
    for doc in docs:
        results.append(f"Content: {doc.page_content[:500]}...")
    return "\n\n".join(results)

In [None]:
@tool
def document_loader_tool(url: str) -> str:
    """Load and process documents from web URLs."""
    try:
        loader = WebBaseLoader([url])
        docs = loader.load()

        # Add to vector store
        vectorstore.add_documents(docs)

        return f"Successfully loaded {len(docs)} documents from {url}. Added to vector database."
    except Exception as e:
        return f"Error loading document from {url}: {str(e)}"

In [None]:
class ParallelRAGState(TypedDict):
    messages: Annotated[list, add_messages]
    query: str
    tool_calls: List[Dict[str, Any]]
    tool_results: Dict[str, Any]
    parallel_results: Dict[str, Any]
    final_context: str
    answer: str

In [None]:
def route_and_select_tools(query: str) -> List[str]:
    """Intelligently route query to appropriate tools based on content."""
    query_lower = query.lower()
    selected_tools = []

    # Always include vector search if we have documents
    if vectorstore._collection.count() > 0:
        selected_tools.append("vector_search_tool")

    # Add web search for current information
    if any(keyword in query_lower for keyword in ["latest", "recent", "current", "news", "2024", "2025"]):
        selected_tools.append("web_search_tool")

    # Add Wikipedia for general knowledge
    if any(keyword in query_lower for keyword in ["what is", "who is", "definition", "history", "concept"]):
        selected_tools.append("wikipedia_search_tool")

    # Add arXiv for research topics
    if any(keyword in query_lower for keyword in ["research", "study", "algorithm", "machine learning", "AI", "paper"]):
        selected_tools.append("arxiv_search_tool")

    # Ensure at least web search and wikipedia
    if not selected_tools:
        selected_tools = ["web_search_tool", "wikipedia_search_tool"]

    return selected_tools

In [None]:
def execute_tools_in_parallel(tools_to_execute: List[str], query: str) -> Dict[str, Any]:
    """Execute multiple tools in parallel using ThreadPoolExecutor."""
    tool_map = {
        "web_search_tool": web_search_tool,
        "wikipedia_search_tool": wikipedia_search_tool,
        "arxiv_search_tool": arxiv_search_tool,
        "vector_search_tool": vector_search_tool
    }

    results = {}
    start_time = time.time()

    with ThreadPoolExecutor(max_workers=len(tools_to_execute)) as executor:
        # Submit all tool executions
        future_to_tool = {
            executor.submit(tool_map[tool_name].invoke, {"query": query}): tool_name
            for tool_name in tools_to_execute if tool_name in tool_map
        }

        # Collect results as they complete
        for future in as_completed(future_to_tool):
            tool_name = future_to_tool[future]
            try:
                result = future.result()
                results[tool_name] = result
                print(f"✓ {tool_name} completed in {time.time() - start_time:.2f}s")
            except Exception as e:
                results[tool_name] = f"Error: {str(e)}"
                print(f"✗ {tool_name} failed: {str(e)}")

    total_time = time.time() - start_time
    print(f"\n🎯 Parallel execution completed in {total_time:.2f}s")

    return results

In [None]:
def query_analysis_node(state: ParallelRAGState):
    """Analyze the query and determine which tools to use."""
    query = state["messages"][-1].content if state["messages"] else state.get("query", "")

    # Route to appropriate tools
    selected_tools = route_and_select_tools(query)

    print(f"🔍 Query: {query}")
    print(f"🛠️  Selected tools: {', '.join(selected_tools)}")

    return {
        "query": query,
        "tool_calls": [{"tool": tool, "query": query} for tool in selected_tools]
    }

In [None]:
def parallel_tool_execution_node(state: ParallelRAGState):
    """Execute tools in parallel and collect results."""
    query = state["query"]
    tool_calls = state["tool_calls"]

    # Extract tool names
    tools_to_execute = [call["tool"] for call in tool_calls]

    print(f"⚡ Executing {len(tools_to_execute)} tools in parallel...")

    # Execute tools in parallel
    parallel_results = execute_tools_in_parallel(tools_to_execute, query)

    return {
        "parallel_results": parallel_results,
        "tool_results": parallel_results
    }

In [None]:
def result_fusion_node(state: ParallelRAGState):
    """Fuse and rank results from parallel tool execution."""
    parallel_results = state["parallel_results"]
    query = state["query"]

    # Combine and weight results
    combined_context = []

    for tool_name, result in parallel_results.items():
        if result and not result.startswith("Error"):
            # Add source attribution
            combined_context.append(f"=== {tool_name.upper().replace('_', ' ')} ===\n{result}\n")

    # Create final context
    final_context = "\n".join(combined_context)

    print(f"📊 Fused results from {len([r for r in parallel_results.values() if r and not str(r).startswith('Error')])} successful tools")

    return {
        "final_context": final_context
    }

In [None]:
def answer_generation_node(state: ParallelRAGState):
    """Generate final answer using fused context."""
    query = state["query"]
    context = state["final_context"]

    # Create prompt for answer generation
    prompt = ChatPromptTemplate.from_messages([
        ("system", """You are an expert assistant that provides comprehensive, accurate answers using information from multiple sources.

Context from parallel tool execution:
{context}

Instructions:
1. Synthesize information from ALL available sources
2. Provide a comprehensive answer that addresses the query completely
3. Cite sources when possible (web, Wikipedia, arXiv, documents)
4. If sources conflict, acknowledge the discrepancy
5. Be specific and detailed in your response"""),
        ("human", "{query}")
    ])

    # Generate response
    response = llm.invoke(prompt.format_messages(context=context, query=query))

    print(f"✨ Generated comprehensive answer with {len(context)} characters of context")

    return {
        "answer": response.content,
        "messages": [AIMessage(content=response.content)]
    }

In [None]:
# Build the LangGraph workflow
workflow = StateGraph(ParallelRAGState)

# Add nodes
workflow.add_node("query_analysis", query_analysis_node)
workflow.add_node("parallel_execution", parallel_tool_execution_node)
workflow.add_node("result_fusion", result_fusion_node)
workflow.add_node("answer_generation", answer_generation_node)

# Define the flow
workflow.add_edge(START, "query_analysis")
workflow.add_edge("query_analysis", "parallel_execution")
workflow.add_edge("parallel_execution", "result_fusion")
workflow.add_edge("result_fusion", "answer_generation")
workflow.add_edge("answer_generation", END)

# Compile the graph
app = workflow.compile()

In [None]:
def run_parallel_rag(query: str):
    """Run the parallel RAG system with a query."""
    print(f"\n{'='*60}")
    print(f"🚀 PARALLEL TOOL-CALLING RAG SYSTEM")
    print(f"{'='*60}")

    start_time = time.time()

    # Initialize state
    initial_state = {
        "messages": [HumanMessage(content=query)],
        "query": query,
        "tool_calls": [],
        "tool_results": {},
        "parallel_results": {},
        "final_context": "",
        "answer": ""
    }

    # Run the workflow
    result = app.invoke(initial_state)

    total_time = time.time() - start_time

    print(f"\n{'='*60}")
    print(f"📝 FINAL ANSWER")
    print(f"{'='*60}")
    print(result["answer"])
    print(f"\n⏱️  Total execution time: {total_time:.2f}s")
    print(f"{'='*60}")

    return result

In [None]:
test_queries = [
    "What are the latest developments in machine learning and RAG systems in 2025?",
    "What is quantum computing and how does it work?",
    "Recent research papers on large language models",
]

# Load some sample documents first
sample_urls = [
    "https://en.wikipedia.org/wiki/Retrieval-augmented_generation",
    "https://en.wikipedia.org/wiki/Machine_learning"
]

print("📥 Loading sample documents...")
for url in sample_urls:
    try:
        result = document_loader_tool.invoke({"url": url})
        print(f"✓ {result}")
    except Exception as e:
        print(f"✗ Error loading {url}: {e}")

# Run test queries
for i, query in enumerate(test_queries, 1):
    print(f"\n\n🔍 TEST QUERY {i}")
    result = run_parallel_rag(query)

    if i < len(test_queries):
        input("\nPress Enter to continue to next query...")

📥 Loading sample documents...
✓ Successfully loaded 1 documents from https://en.wikipedia.org/wiki/Retrieval-augmented_generation. Added to vector database.
✓ Successfully loaded 1 documents from https://en.wikipedia.org/wiki/Machine_learning. Added to vector database.


🔍 TEST QUERY 1

🚀 PARALLEL TOOL-CALLING RAG SYSTEM
🔍 Query: What are the latest developments in machine learning and RAG systems in 2025?
🛠️  Selected tools: vector_search_tool, web_search_tool, arxiv_search_tool
⚡ Executing 3 tools in parallel...
✓ vector_search_tool completed in 0.06s
✓ arxiv_search_tool completed in 1.04s
✓ web_search_tool completed in 13.12s

🎯 Parallel execution completed in 13.12s
📊 Fused results from 3 successful tools
✨ Generated comprehensive answer with 4628 characters of context

📝 FINAL ANSWER
**The landscape of machine learning (ML) and Retrieval‑Augmented Generation (RAG) has moved forward dramatically in 2025.**  
Below is a synthesis of the most‑relevant developments that have been repo

In [None]:
def get_system_stats():
    """Get system performance statistics."""
    stats = {
        "vector_db_docs": vectorstore._collection.count(),
        "available_tools": ["web_search", "wikipedia", "arxiv", "vector_search", "document_loader"],
        "parallel_execution": "enabled",
        "intelligent_routing": "enabled"
    }
    return stats

get_system_stats()

{'vector_db_docs': 4,
 'available_tools': ['web_search',
  'wikipedia',
  'arxiv',
  'vector_search',
  'document_loader'],
 'parallel_execution': 'enabled',
 'intelligent_routing': 'enabled'}