<a href="https://colab.research.google.com/github/ben45123/AWS-Lambda-Research/blob/main/rag_news_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [23]:
# Cell 1: Install the necessary packages
!pip install pinecone langchain langchain_openai openai langgraph pydantic



In [24]:
# Cell 2: Import libraries and set API keys
import os
import json
import pandas as pd
from typing import List, Dict, Any
import pinecone
from google.colab import userdata

# Set API keys from Colab userdata
OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
PINECONE_API_KEY = userdata.get('PINECONE_API_KEY')  # Or use your provided key if saved
# If you're using the key from your provided code
# PINECONE_API_KEY = "pcsk_6akU8Z_2BXXXDSBKbvFCn4sciNM2FeJC6PwAt6wFwQeQjoJKDSjysRbtyBAdUfRv6z87e6"

# Set environment variables (some LangChain components use these)
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

# Configuration for Pinecone
PINECONE_INDEX_NAME = "cus635"
PINECONE_ENVIRONMENT = "us-east-1"
NAMESPACE = "Team_1"
CATEGORY = "Finance"

print("API keys loaded successfully!")

API keys loaded successfully!


In [25]:
# Cell 3: Import LangChain and LangGraph components
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.vectorstores import Pinecone as LangchainPinecone
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.chains import ConversationalRetrievalChain
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.graph import StateGraph, END
from pydantic import BaseModel, Field

# Initialize models
embeddings = OpenAIEmbeddings()
llm = ChatOpenAI(
    model_name="gpt-3.5-turbo",
    temperature=0.2
)

print("LangChain components initialized!")

LangChain components initialized!


In [26]:
# Cell 4: Create the NewsRAG class for basic RAG functionality
class NewsRAG:
    """
    A RAG system that uses LangChain to answer questions based on news articles
    stored in Pinecone.
    """

    def __init__(self, api_key: str, index_name: str, namespace: str, category: str):
        """
        Initialize the NewsRAG with Pinecone credentials and team information.

        Args:
            api_key: Pinecone API key
            index_name: Pinecone index name
            namespace: Namespace (team name)
            category: News category
        """
        self.api_key = api_key
        self.index_name = index_name
        self.namespace = namespace
        self.category = category
        self.pinecone_index = None
        self.retriever = None
        self.rag_chain = None
        self.conversational_memory = []

    def connect_to_pinecone(self):
        """Connect to Pinecone and initialize the index."""
        try:
            # Initialize Pinecone
            pinecone_client = pinecone.Pinecone(api_key=self.api_key, environment="us-east-1")
            self.pinecone_index = pinecone_client.Index(self.index_name)
            print(f"Connected to Pinecone index: {self.index_name}")
            return True
        except Exception as e:
            print(f"Error connecting to Pinecone: {str(e)}")
            return False

    def initialize_retriever(self):
        """Initialize the retriever from the Pinecone index."""
        try:
            # Create a LangChain vectorstore
            vectorstore = LangchainPinecone(
                index=self.pinecone_index,
                embedding=embeddings,
                text_key="text",
                namespace=self.namespace
            )

            # Create the retriever with filters
            self.retriever = vectorstore.as_retriever(
                search_kwargs={
                    "k": 5,
                    "filter": {"category": self.category}
                }
            )

            print(f"Retriever initialized for category: {self.category}")
            return True
        except Exception as e:
            print(f"Error initializing retriever: {str(e)}")
            return False

    def initialize_rag_chain(self):
        """Initialize the basic RAG chain."""
        # Define the prompt template
        template = """You are an AI assistant specialized in analyzing news articles in the {category} domain.
        Use the following retrieved news articles to answer the question.

        Retrieved articles:
        {context}

        Question: {question}

        Answer the question based on the retrieved articles. If the retrieved articles don't contain the information
        needed to answer the question accurately, acknowledge the limitations and provide the best answer possible
        based on available information. Include relevant sources in your response.
        """

        # Create the prompt
        prompt = ChatPromptTemplate.from_template(template)

        # Create the RAG chain
        self.rag_chain = (
            {"context": self.retriever, "question": RunnablePassthrough(), "category": lambda _: self.category}
            | prompt
            | llm
            | StrOutputParser()
        )

        print("Basic RAG chain initialized")
        return True

    def query(self, question: str) -> str:
        """
        Query the RAG system with a question.

        Args:
            question: User's question

        Returns:
            Answer based on the retrieved documents
        """
        if not self.rag_chain:
            self.initialize_rag_chain()

        try:
            return self.rag_chain.invoke(question)
        except Exception as e:
            print(f"Error querying RAG: {str(e)}")
            return f"Error processing your query: {str(e)}"

# Test the class definition
print("NewsRAG class defined successfully!")

NewsRAG class defined successfully!


In [27]:
# Cell 5: Add conversational RAG functionality
class ConversationalNewsRAG(NewsRAG):
    """Extends NewsRAG with conversational capabilities."""

    def __init__(self, api_key: str, index_name: str, namespace: str, category: str):
        """Initialize using parent constructor."""
        super().__init__(api_key, index_name, namespace, category)
        self.chat_history = []
        self.conversational_rag = None

    def initialize_conversational_rag(self):
        """Initialize a conversational RAG chain with memory."""
        if not self.retriever:
            self.initialize_retriever()

        self.conversational_rag = ConversationalRetrievalChain.from_llm(
            llm=llm,
            retriever=self.retriever,
            return_source_documents=True
        )

        print("Conversational RAG chain initialized")
        return True

    def conversational_query(self, question: str) -> Dict:
        """
        Query the conversational RAG system with chat history.

        Args:
            question: User's question

        Returns:
            Answer and source documents
        """
        if not self.conversational_rag:
            self.initialize_conversational_rag()

        try:
            # Get response using chat history
            result = self.conversational_rag.invoke({
                "question": question,
                "chat_history": self.chat_history
            })

            # Update chat history
            self.chat_history.append((question, result["answer"]))

            return result
        except Exception as e:
            print(f"Error in conversational query: {str(e)}")
            return {"answer": f"Error processing your query: {str(e)}", "source_documents": []}

# Test the class definition
print("ConversationalNewsRAG class defined successfully!")

ConversationalNewsRAG class defined successfully!


In [28]:
# Cell 6: Define the state for our LangGraph agent
class AgentState(BaseModel):
    """State for our RAG agent with LangGraph."""
    question: str = Field(description="The current question being asked")
    thoughts: str = Field(default="", description="The agent's thoughts about how to answer the question")
    research_needed: bool = Field(default=False, description="Whether more research is needed")
    retrieved_documents: List[Dict] = Field(default_factory=list, description="Documents retrieved from the vector database")
    final_answer: str = Field(default="", description="The final answer to the user's question")

print("AgentState class defined for LangGraph!")

AgentState class defined for LangGraph!


In [29]:
# Cell 7: Create the NewsRAGAgent with LangGraph for reasoning
class NewsRAGAgent:
    """A more advanced RAG agent using LangGraph for reasoning."""

    def __init__(self, news_rag: NewsRAG):
        self.news_rag = news_rag
        self.workflow = None

    def _analyze_question(self, state: AgentState) -> AgentState:
        """Analyze the user's question to determine the best approach."""
        analysis_prompt = ChatPromptTemplate.from_template(
            """Analyze the following question related to {category} news:

            Question: {question}

            Think about what kind of information is needed to answer this question properly.
            Should I retrieve specific articles or information from the database?

            Thoughts:"""
        )

        thoughts = llm.invoke(
            analysis_prompt.format(
                question=state.question,
                category=self.news_rag.category
            )
        ).content

        state.thoughts = thoughts
        state.research_needed = "retrieve" in thoughts.lower() or "search" in thoughts.lower() or "database" in thoughts.lower()
        return state

    def _retrieve_documents(self, state: AgentState) -> AgentState:
        """Retrieve relevant documents based on the question."""
        if state.research_needed:
            try:
                if not self.news_rag.retriever:
                    self.news_rag.initialize_retriever()

                docs = self.news_rag.retriever.get_relevant_documents(state.question)
                state.retrieved_documents = [
                    {
                        "content": doc.page_content,
                        "metadata": doc.metadata
                    } for doc in docs
                ]
            except Exception as e:
                print(f"Error retrieving documents: {str(e)}")
        return state

    def _synthesize_answer(self, state: AgentState) -> AgentState:
        """Synthesize an answer based on retrieved documents."""
        if state.retrieved_documents:
            synthesis_prompt = ChatPromptTemplate.from_template(
                """You are a specialized AI assistant for {category} news analysis.

                Question: {question}

                Retrieved documents:
                {documents}

                Based on these documents, provide a comprehensive answer to the question.
                Include sources where appropriate. If the retrieved documents don't contain
                sufficient information, acknowledge this limitation.

                Answer:"""
            )

            # Format documents for the prompt
            docs_text = "\n\n".join([
                f"Document {i+1}:\n{doc['content']}\nSource: {doc['metadata'].get('source', 'Unknown')}"
                for i, doc in enumerate(state.retrieved_documents)
            ])

            state.final_answer = llm.invoke(
                synthesis_prompt.format(
                    question=state.question,
                    documents=docs_text,
                    category=self.news_rag.category
                )
            ).content
        else:
            state.final_answer = "I couldn't find any relevant information to answer your question about " + self.news_rag.category + "."

        return state

    def _deliver_answer(self, state: AgentState) -> AgentState:
        """Format and deliver the final answer."""
        if not state.final_answer:
            state.final_answer = "I don't have enough information to answer your question about " + self.news_rag.category + "."
        return state

    def build_workflow(self) -> StateGraph:
        """Build the agent workflow graph."""
        # Define our state graph
        workflow = StateGraph(AgentState)

        # Add nodes
        workflow.add_node("analyze_question", self._analyze_question)
        workflow.add_node("retrieve_documents", self._retrieve_documents)
        workflow.add_node("synthesize_answer", self._synthesize_answer)
        workflow.add_node("deliver_answer", self._deliver_answer)

        # Define edges
        workflow.add_edge("analyze_question", "retrieve_documents")
        workflow.add_conditional_edges(
            "retrieve_documents",
            lambda state: "synthesize_answer" if state.retrieved_documents else "deliver_answer",
            {
                True: "synthesize_answer",
                False: "deliver_answer"
            }
        )
        workflow.add_edge("synthesize_answer", "deliver_answer")
        workflow.add_edge("deliver_answer", END)

        # Set entry point
        workflow.set_entry_point("analyze_question")

        self.workflow = workflow.compile()
        return self.workflow

    def run(self, question: str) -> str:
        """Run the agent with a question."""
        if not self.workflow:
            self.build_workflow()

        # Create initial state with the question
        initial_state = AgentState(question=question)

        # Run the workflow
        final_state = self.workflow.invoke(initial_state)

        return final_state.final_answer

# Test the class definition
print("NewsRAGAgent class with LangGraph defined successfully!")

NewsRAGAgent class with LangGraph defined successfully!


In [30]:
# Cell 8: Initialize and test the basic RAG system
# Initialize the RAG system
rag = NewsRAG(
    api_key=PINECONE_API_KEY,
    index_name=PINECONE_INDEX_NAME,
    namespace=NAMESPACE,
    category=CATEGORY
)

# Connect to Pinecone
print("Connecting to Pinecone...")
if rag.connect_to_pinecone():
    print("Successfully connected to Pinecone!")

    # Initialize the retriever
    print("Initializing retriever...")
    if rag.initialize_retriever():
        print("Retriever initialized!")

        # Initialize the RAG chain
        print("Initializing RAG chain...")
        if rag.initialize_rag_chain():
            print("RAG chain initialized!")

            # Test with a basic query
            test_question = "What are the recent trends in finance?"
            print(f"\nTesting with question: '{test_question}'")
            answer = rag.query(test_question)
            print(f"\nAnswer: {answer}")
        else:
            print("Failed to initialize RAG chain.")
    else:
        print("Failed to initialize retriever.")
else:
    print("Failed to connect to Pinecone.")

Connecting to Pinecone...
Connected to Pinecone index: cus635
Successfully connected to Pinecone!
Initializing retriever...
Error initializing retriever: client should be an instance of pinecone.Index, got <class 'pinecone.data.index.Index'>
Failed to initialize retriever.


In [31]:
# Cell 9: Test the conversational RAG system
# Initialize the conversational RAG
conv_rag = ConversationalNewsRAG(
    api_key=PINECONE_API_KEY,
    index_name=PINECONE_INDEX_NAME,
    namespace=NAMESPACE,
    category=CATEGORY
)

# Connect to Pinecone and initialize
print("Connecting to Pinecone for conversational RAG...")
if conv_rag.connect_to_pinecone() and conv_rag.initialize_retriever():
    print("Conversational RAG system ready!")

    # Test with a sequence of questions
    questions = [
        "What are the major financial markets performing right now?",
        "What factors are affecting their performance?",
        "Which companies are mentioned most frequently in recent financial news?"
    ]

    for i, question in enumerate(questions):
        print(f"\nQuestion {i+1}: {question}")
        result = conv_rag.conversational_query(question)
        print(f"\nAnswer: {result['answer']}")
        print("\nSources:")
        for j, doc in enumerate(result.get("source_documents", [])[:2]):
            print(f"  {j+1}. {doc.metadata.get('title', 'Unknown title')} - {doc.metadata.get('source', 'Unknown source')}")
else:
    print("Failed to initialize the conversational RAG system.")

Connecting to Pinecone for conversational RAG...
Connected to Pinecone index: cus635
Error initializing retriever: client should be an instance of pinecone.Index, got <class 'pinecone.data.index.Index'>
Failed to initialize the conversational RAG system.


In [32]:
# Cell 10: Test the LangGraph agent
# Initialize the RAG base for our agent
rag_base = NewsRAG(
    api_key=PINECONE_API_KEY,
    index_name=PINECONE_INDEX_NAME,
    namespace=NAMESPACE,
    category=CATEGORY
)

# Connect and initialize
print("Setting up the base RAG system for our agent...")
if rag_base.connect_to_pinecone() and rag_base.initialize_retriever():
    print("Base RAG system ready for the agent!")

    # Create the LangGraph agent
    print("Creating LangGraph agent...")
    agent = NewsRAGAgent(rag_base)

    # Run the agent with a question
    test_questions = [
        "What is the current state of cryptocurrency markets?",
        "How are regulatory changes affecting the financial sector?",
        "What are the implications of recent financial news for retail investors?"
    ]

    print("\nTesting the LangGraph agent with different questions:")
    for i, question in enumerate(test_questions):
        print(f"\nQuestion {i+1}: {question}")
        answer = agent.run(question)
        print(f"\nAgent response:\n{answer}")
else:
    print("Failed to set up the base RAG system for the agent.")

Setting up the base RAG system for our agent...
Connected to Pinecone index: cus635
Error initializing retriever: client should be an instance of pinecone.Index, got <class 'pinecone.data.index.Index'>
Failed to set up the base RAG system for the agent.


In [33]:
# Cell 11: Create a user-friendly interface for interacting with our RAG systems
def display_welcome():
    """Display a welcome message with instructions."""
    welcome_text = """
    # 📰 News RAG Agent - Interactive Demo 📰

    This system uses RAG (Retrieval-Augmented Generation) to answer questions about news articles.

    Choose a mode:
    1. Basic RAG - Simple question answering
    2. Conversational RAG - Maintains context across questions
    3. LangGraph Agent - Advanced reasoning with multiple steps

    Type 'exit' at any time to quit.
    """
    print(welcome_text)

def run_interactive_session():
    """Run an interactive session with the user."""
    display_welcome()

    # Initialize all systems
    print("\nInitializing all RAG systems...")

    # Basic RAG
    basic_rag = NewsRAG(
        api_key=PINECONE_API_KEY,
        index_name=PINECONE_INDEX_NAME,
        namespace=NAMESPACE,
        category=CATEGORY
    )

    # Conversational RAG
    conv_rag = ConversationalNewsRAG(
        api_key=PINECONE_API_KEY,
        index_name=PINECONE_INDEX_NAME,
        namespace=NAMESPACE,
        category=CATEGORY
    )

    # LangGraph Agent
    agent_base = NewsRAG(
        api_key=PINECONE_API_KEY,
        index_name=PINECONE_INDEX_NAME,
        namespace=NAMESPACE,
        category=CATEGORY
    )
    agent = NewsRAGAgent(agent_base)

    # Connect everything to Pinecone
    if (basic_rag.connect_to_pinecone() and
        basic_rag.initialize_retriever() and
        basic_rag.initialize_rag_chain() and
        conv_rag.connect_to_pinecone() and
        conv_rag.initialize_retriever() and
        conv_rag.initialize_conversational_rag() and
        agent_base.connect_to_pinecone() and
        agent_base.initialize_retriever() and
        agent.build_workflow()):

        print("\nAll systems ready!")

        mode = input("\nChoose a mode (1, 2, or 3): ")

        while True:
            if mode not in ["1", "2", "3"]:
                mode = input("Please enter a valid mode (1, 2, or 3): ")
                continue

            # Display the selected mode
            mode_names = {
                "1": "Basic RAG",
                "2": "Conversational RAG",
                "3": "LangGraph Agent"
            }
            print(f"\n[{mode_names[mode]} Mode]")

            # Get the question
            question = input("\nYour question: ")

            if question.lower() == 'exit':
                break

            if question.lower() == 'change mode':
                mode = input("\nChoose a new mode (1, 2, or 3): ")
                continue

            print("\nProcessing...")

            # Process based on the selected mode
            if mode == "1":
                answer = basic_rag.query(question)
                print(f"\nAnswer: {answer}")

            elif mode == "2":
                result = conv_rag.conversational_query(question)
                print(f"\nAnswer: {result['answer']}")
                print("\nSources:")
                for i, doc in enumerate(result.get("source_documents", [])[:2]):
                    print(f"  {i+1}. {doc.metadata.get('title', 'Unknown title')} - {doc.metadata.get('source', 'Unknown source')}")

            elif mode == "3":
                answer = agent.run(question)
                print(f"\nAgent response:\n{answer}")

            print("\nType 'change mode' to switch modes or 'exit' to quit.")
    else:
        print("Failed to initialize RAG systems.")

# Run the interactive session
run_interactive_session()


    # 📰 News RAG Agent - Interactive Demo 📰
    
    This system uses RAG (Retrieval-Augmented Generation) to answer questions about news articles.
    
    Choose a mode:
    1. Basic RAG - Simple question answering
    2. Conversational RAG - Maintains context across questions
    3. LangGraph Agent - Advanced reasoning with multiple steps
    
    Type 'exit' at any time to quit.
    

Initializing all RAG systems...
Connected to Pinecone index: cus635
Error initializing retriever: client should be an instance of pinecone.Index, got <class 'pinecone.data.index.Index'>
Failed to initialize RAG systems.
