# Chatbot Assistant Prototype with Conversation Memory & Session IDs

This notebook demonstrates an updated chatbot assistant that:
- Uses **LangChain** with an OpenAI LLM and a Pinecone‑hosted RAG knowledge base.
- Integrates a **weather tool** (using wttr.in as a demo API).
- Maintains **conversation memory** using session IDs so that previous Q&A pairs are considered.
- Supports both **non‑streaming** and **streaming** output.

Make sure to set the following environment variables (e.g., in your shell or within the notebook):
- `OPENAI_API_KEY`
- `PINECONE_API_KEY`
- `PINECONE_ENVIRONMENT`
- `PINECONE_INDEX_NAME`

In [None]:
# Install the required packages.
# Run this cell to ensure you have all dependencies installed.
!pip install langchain langchain_community pinecone-client openai requests nest_asyncio sse-starlette fastapi uvicorn tiktoken

In [36]:
import os
import uuid
import asyncio
import requests

import pinecone
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional

# LangChain imports
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Pinecone as LC_Pinecone
from langchain.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, Tool
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chains import RetrievalQA

# For streaming via Server-Sent Events (SSE)
from sse_starlette.sse import EventSourceResponse

# Allow nested event loops in notebooks
import nest_asyncio
nest_asyncio.apply()

In [37]:
# Global in-memory conversation memory.
# Maps a session_id (string) to a list of conversation turns.
chat_memories = {}

def get_current_weather(location: str, unit: str = "Celsius") -> str:
    """
    Fetch current weather information for a given location.
    (Uses wttr.in as a demo API in place of Tavily.)
    """
    try:
        response = requests.get(f"https://wttr.in/{location}?format=3")
        if response.status_code == 200:
            return response.text.strip()
        else:
            return f"Could not fetch weather data for {location}."
    except Exception as e:
        return f"Error fetching weather data: {str(e)}"

# Wrap the weather function as a LangChain Tool.
weather_tool = Tool(
    name="get_current_weather",
    func=get_current_weather,
    description=(
        "Useful for when you need to get the current weather information for a location. "
        "The input to this tool should be a location name (e.g., 'Paris')."
    )
)

In [38]:
class ChatbotAssistant:
    def __init__(self, streaming: bool = False):
        self.streaming = streaming
        callbacks = [StreamingStdOutCallbackHandler()] if streaming else []
        
        # Initialize the LLM (here, GPT‑4 via ChatOpenAI).
        self.llm = ChatOpenAI(
            model_name="gpt-4",
            streaming=streaming,
            callbacks=callbacks,
            temperature=0.0,
        )
        
        # Set up OpenAI embeddings.
        self.embeddings = OpenAIEmbeddings()
        
        # Initialize Pinecone using environment variables.
        pinecone_api_key = os.environ.get("PINECONE_API_KEY")
        pinecone_env = os.environ.get("PINECONE_ENVIRONMENT", "us-east-1")
        pinecone_index_name = os.environ.get("PINECONE_INDEX_NAME", "schh")
        if not pinecone_api_key or not pinecone_env or not pinecone_index_name:
            raise Exception("Missing one or more required Pinecone environment variables.")
        # pinecone.init(api_key=pinecone_api_key, environment=pinecone_env)

        # Connect to the existing Pinecone index.
        self.vectorstore = LC_Pinecone.from_existing_index(
            index_name=pinecone_index_name,
            embedding=self.embeddings,
            text_key="text"  # Assumes documents are indexed under the "text" field.
        )
        
        # Build a RetrievalQA chain that uses the Pinecone vectorstore.
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",  # "stuff" simply concatenates documents.
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": 4}),
            return_source_documents=True,
        )
        
        # Initialize an agent that has access to the weather tool.
        self.agent = initialize_agent(
            tools=[weather_tool],
            llm=self.llm,
            agent="zero-shot-react-description",
            verbose=True,
        )
        
    def answer_query(self, query: str, session_id: Optional[str] = None) -> dict:
        """
        Answer the given query while considering previous conversation turns if provided.
        If a session_id is provided and history exists, that history is prepended to the query.
        Weather-related queries are handled by the agent (with the weather tool);
        other queries use the RetrievalQA chain.
        
        After obtaining an answer, the new Q/A pair is appended to the conversation history.
        
        Returns:
            dict: { "answer": <text>, "sources": [<source1>, ...] }
        """
        original_query = query  # Preserve the original question.
        if session_id is not None:
            history = chat_memories.get(session_id, [])
            if history:
                context = "\n".join(history)
                query = f"Conversation History:\n{context}\n\nNew Question: {query}"
        
        if "weather" in original_query.lower():
            answer = self.agent.run(query)
            result = {"answer": answer, "sources": []}
        else:
            chain_result = self.qa_chain(query)
            answer = chain_result.get("result", "")
            sources = []
            for doc in chain_result.get("source_documents", []):
                source_info = doc.metadata.get("source", "Unknown source")
                sources.append(source_info)
            result = {"answer": answer, "sources": sources}
        
        # Update conversation memory.
        if session_id is not None:
            memory_entry = f"Q: {original_query}\nA: {answer}"
            if session_id in chat_memories:
                chat_memories[session_id].append(memory_entry)
            else:
                chat_memories[session_id] = [memory_entry]
        return result

In [42]:
app = FastAPI()

# Instantiate the assistant (defaulting to non‑streaming mode).
assistant = ChatbotAssistant(streaming=False)

# Define request/response models.
class QueryRequest(BaseModel):
    query: str
    session_id: Optional[str] = None  # Client may supply a session ID.
    streaming: Optional[bool] = False  # Option for streaming output.

class QueryResponse(BaseModel):
    answer: str
    sources: List[str] = []
    session_id: str  # Returned so the client can continue the conversation.

@app.post("/chat", response_model=QueryResponse)
async def chat_endpoint(request: QueryRequest):
    """
    Synchronous (non‑streaming) endpoint.
    If no session_id is provided, a new one is generated.
    """
    global assistant
    if request.streaming != assistant.streaming:
        assistant = ChatbotAssistant(streaming=request.streaming)
    session_id = request.session_id or str(uuid.uuid4())
    result = assistant.answer_query(request.query, session_id=session_id)
    return QueryResponse(answer=result["answer"], sources=result["sources"], session_id=session_id)

# Custom callback handler for streaming tokens.
class QueueCallbackHandler(StreamingStdOutCallbackHandler):
    def __init__(self):
        super().__init__()
        self.queue = asyncio.Queue()

    def on_llm_new_token(self, token: str, **kwargs) -> None:
        asyncio.create_task(self.queue.put(token))

async def stream_tokens(query: str, session_id: Optional[str] = None):
    """
    Generator that streams tokens as SSE events.
    If conversation history exists for the session, it is prepended.
    """
    original_query = query
    if session_id is not None:
        history = chat_memories.get(session_id, [])
        if history:
            context = "\n".join(history)
            query = f"Conversation History:\n{context}\n\nNew Question: {query}"
    handler = QueueCallbackHandler()
    llm_stream = ChatOpenAI(
        model_name="gpt-4",
        streaming=True,
        callbacks=[handler],
        temperature=0.0,
    )
    if "weather" in original_query.lower():
        agent = initialize_agent(
            tools=[weather_tool],
            llm=llm_stream,
            agent="zero-shot-react-description",
            verbose=True,
        )
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, agent.run, query)
    else:
        retrieval_chain = RetrievalQA.from_chain_type(
            llm=llm_stream,
            chain_type="stuff",
            retriever=assistant.vectorstore.as_retriever(search_kwargs={"k": 4}),
            return_source_documents=True,
        )
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, retrieval_chain, query)
    
    answer_parts = []
    while True:
        try:
            token = await asyncio.wait_for(handler.queue.get(), timeout=1.0)
            answer_parts.append(token)
            yield f"data: {token}\n\n"
        except asyncio.TimeoutError:
            if future.done():
                break
    answer = "".join(answer_parts)
    if session_id is not None:
        memory_entry = f"Q: {original_query}\nA: {answer}"
        if session_id in chat_memories:
            chat_memories[session_id].append(memory_entry)
        else:
            chat_memories[session_id] = [memory_entry]
    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream_endpoint(request: QueryRequest):
    """
    Streaming endpoint (SSE) that uses a session ID to track conversation history.
    """
    session_id = request.session_id or str(uuid.uuid4())
    return EventSourceResponse(stream_tokens(request.query, session_id=session_id))

In [43]:
# Instantiate a non‑streaming assistant.
assistant = ChatbotAssistant(streaming=False)

# Create a new session.
session_id = str(uuid.uuid4())
print("Session ID:", session_id)

# Example 1: Weather query.
query1 = "What's the weather in London?"
result1 = assistant.answer_query(query1, session_id=session_id)
print("Query 1:", query1)
print("Answer 1:", result1["answer"])
print("Sources 1:", result1["sources"])

# Example 2: Follow-up query (using conversation history).
query2 = "And what about tomorrow?"
result2 = assistant.answer_query(query2, session_id=session_id)
print("\nQuery 2:", query2)
print("Answer 2:", result2["answer"])
print("Sources 2:", result2["sources"])

Session ID: ebe466dd-dcff-4573-8e7c-540e3970b10c


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mI need to use the get_current_weather tool to find out the current weather in London.
Action: get_current_weather
Action Input: London[0m
Observation: [36;1m[1;3mLondon: ☀️   +1°C[0m
Thought:[32;1m[1;3mI now know the final answer
Final Answer: The weather in London is sunny with a temperature of +1°C.[0m

[1m> Finished chain.[0m
Query 1: What's the weather in London?
Answer 1: The weather in London is sunny with a temperature of +1°C.
Sources 1: []

Query 2: And what about tomorrow?
Answer 2: I'm sorry, but I don't have the ability to provide weather forecasts.
Sources 2: ['./sports/golf/2024GolfTownHall2.pdf', 'clubs/computer-club/education-weekly.md', 'clubs/computer-club/education-weekly.md', 'clubs/computer-club/education-weekly.md']


In [None]:
async def test_streaming():
    session_id = str(uuid.uuid4())
    print("Streaming Session ID:", session_id)
    query = "Tell me about the benefits of renewable energy."
    print("Streaming Query:", query)
    async for token in stream_tokens(query, session_id=session_id):
        print(token, end='')

# Uncomment and run the cell below to test streaming.
await test_streaming()

Streaming Session ID: 4a684c4b-f013-4800-86a4-50c99685ce18
Streaming Query: Tell me modification requests.
data: [DONE]

