In [None]:
pip install langchain_openai langchain langchain_community faiss-gpu

create history aware retriever

In [None]:
import pandas as pd
import numpy as np
import os
import re
import bs4
from uuid import uuid4
from langchain import hub
from langchain_community.document_loaders import PyPDFLoader, TextLoader, WebBaseLoader
from langchain_community.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain.schema import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.agents import AgentExecutor, create_react_agent
from langchain.memory import ChatMessageHistory
from langchain_core.tools import Tool
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_text_splitters import RecursiveCharacterTextSplitter
print('Import completed')

In [None]:
# SETUPs

# Set up API key
OPENAI_API_KEY = "sk..."

chat_prompt = """You are a mathematical AI assistant. If it is a math related problem, use this response format:

**Problem Analysis**
Analyze the promblem

**Key Formulas**
All formulas that can be used to solve the problem
- Formula 1
- Formula 2
...

**Solution Steps**
Detail explanation for each step
1. step1
2. step2
...

**Self-Check**
Check the intuition, formulas, solution steps. Try to prove your result is wrong. If the result is wrong, redo the problem. If the result is correct, prove it again.
- Check 1
- Check 2
...

**Final Answer**
[Clear conclusion with boxed answer]"""

# Load the agent prompt from LagnSmith
agent_prompt = hub.pull("hwchase17/react-chat") # a common agent, should consider create one for this specific task

RAG_file_path = '/kaggle/input/probability-textbook/Probability.pdf'

In [None]:


def defined_llm(OPENAI_API_KEY, chat_prompt=chat_prompt):
    SESSION_ID = str(uuid4())
    print(f"Session ID: {SESSION_ID}")
    
    llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, model="gpt-4-turbo")
    embedding_provider = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,
        chunk_overlap=150,
        separators=[r'\n\s*•', r'\n\d+\.', r'\n\*', '\n\n'],
        add_start_index=True
    )
    
    vectorstore = FAISS.from_texts(
        texts=["Math Knowledge Base Initialized"], 
        embedding=embedding_provider,
        metadatas=[{"source": "system-init"}]
    )
    retriever = vectorstore.as_retriever(
        search_type="mmr",
        search_kwargs={
            "k": 5,
            "score_threshold": 0.7,
            "lambda_mult": 0.5
        }
)

    def format_docs(docs):
        formatted = []
        for i, doc in enumerate(docs):
            # Handle Document objects
            if hasattr(doc, 'metadata'):
                source_type = doc.metadata.get('source_type', 'unknown')
                source = doc.metadata.get('source_path', doc.metadata.get('source_url', 'unknown'))
                content = doc.page_content
            # Handle strings (fallback)
            else:
                source_type = "unknown"
                source = "unknown"
                content = str(doc)
                
            formatted.append(
                f"📚 Source {i+1} ({source_type}): {source}\n"
                f"{content[:500]}..."
            )
        return "\n\n".join(formatted)

    def debug_retrieval(query, retriever, top_k=3):
        docs = retriever.invoke(query)
        print("\n🔍 Retrieved Context Preview:")
        for i, doc in enumerate(docs[:top_k]):
            print(f"\n📄 Document {i+1}:")
            print(f"   Source: {doc.metadata.get('source_url', doc.metadata.get('source_path', 'unknown'))}")
            print(f"   Content: {doc.page_content[:300]}...")
        return docs

    prompt = ChatPromptTemplate.from_messages([
        ("system", chat_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "Context:\n{context}\n\nQuestion: {input}"),
    ])

    # Core processing chain
    rag_chain = (
        RunnablePassthrough.assign(
            context=lambda x: format_docs(x["context"]),
            chat_history=lambda x: x["chat_history"]
        )
        | prompt
        | llm
        | StrOutputParser()
    )

    contextualize_q_system_prompt = """Given a chat history and the latest user question:
    1. DO NOT modify or rephrase the original question
    2. Instead, add any relevant background information from the chat history as a prefix
    3. Format the output as:
       [Background Info (if any)]
       Original Question: <exact original question>"""
    
    contextualize_q_prompt = ChatPromptTemplate.from_messages([
        ("system", contextualize_q_system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ])

    def process_query(input_text, chat_history):
        standalone_chain = contextualize_q_prompt | llm | StrOutputParser()
        standalone_question = standalone_chain.invoke({
            "chat_history": chat_history,
            "input": input_text
        })
        
        relevant_docs = retriever.invoke(standalone_question)
        context = format_docs(relevant_docs)
        debug_retrieval(standalone_question, retriever)
        
        response = rag_chain.invoke({
            "input": input_text,
            "context": context,
            "chat_history": chat_history
        })
       
        # Self-check verification
        check_prompt = f"""Verify this solution contains:
        1. Make sure the logic is correct. Check the method, formulas, each step.
        2. Try to prove the response is incorrect. If it is incorrect, follow the logic and redo the problem.
        3. Prove the response is correct.

        If any issues are found, provide the correction.
        If no issues are found, respond with "VERIFIED" and briefly explain why.
        
        Solution to verify:
        {response}
        
        Missing/incorrect components:"""
        
        verification = rag_chain.invoke({
            "input": check_prompt,
            "context": context,
            "chat_history": chat_history
        })
        
        # If verification finds issues, get corrected solution
        if "VERIFIED" not in verification:
            corrected = rag_chain.invoke({
                "input": f"""The previous solution had issues. Please provide a corrected solution that addresses these verification issues:{verification}
                Original question: {input_text}""",
                "context": context,
                "chat_history": chat_history
            })
            return f"{response}\n\n**Original Verification Feedback:**\n{verification}\n\n**Corrected Response:**\n{corrected}"
        
        return f"{response}\n\n**Verification:**\n{verification}"

    # Memory
    store = {}
    def get_memory(session_id: str) -> ChatMessageHistory:
        if session_id not in store:
            store[session_id] = ChatMessageHistory(max_messages=20)
        return store[session_id]

    chat_agent = RunnableWithMessageHistory(
        RunnablePassthrough.assign(
            response=lambda x: process_query(x["input"], x["chat_history"])
        ),
        get_memory,
        input_messages_key="input",
        history_messages_key="chat_history",
    )

    def process_web_content(url):
        try:
            loader = WebBaseLoader(
                web_paths=[url],
                bs_kwargs=dict(
                    parse_only=bs4.SoupStrainer(
                        # Universal content detection
                        ['article', 'main', 'div', 'section', 'content'],
                        class_=lambda value: value and any(
                            kw in value.lower()
                            for kw in ['content', 'article', 'main', 'body', 'text']
                        )
                    )
                )
            )
            docs = loader.load()
            splits = text_splitter.split_documents(docs)
            
            for split in splits:
                split.metadata.update({
                    "source_type": "web",
                    "source_url": url,
                    "content_type": self_detect_content_type(split.page_content)
                })
            
            vectorstore.add_documents(splits)
            print(f"\nAI: Added {len(splits)} chunks from {url}")
            return True
        except Exception as e:
            print(f"\nAI: Error processing {url}: {str(e)}")
            return False

    def self_detect_content_type(text):
        math_keywords = ['theorem', 'formula', 'equation', 'proof', 'lemma']
        if any(kw in text.lower() for kw in math_keywords):
            return "math"
        return "general"

    # Interactive loop
    print("Math Expert System - Type 'exit' to quit")
    print("Input formats:")
    print("- RAG_file_path=\"/path/to/file.pdf\" https://example.com Your question")
    print("- Include math formulas using $...$ notation")
    
    url_pattern = re.compile(
        r'(?:http|ftp)s?://(?:[A-Z0-9-]+\.)+[A-Z]{2,}(?::\d+)?(?:/[\w\-./?%&=]*)?', 
        re.IGNORECASE
    )
    
    while True:
        try:
            user_input = input("\nYou: ").strip()
            if user_input.lower() in ['exit', 'quit']:
                print("Goodbye!")
                break

            # Process file paths
            file_paths = re.findall(r'RAG_file_path="([^"]+)"', user_input)
            for fp in file_paths:
                try:
                    if not os.path.exists(fp):
                        raise FileNotFoundError(f"File not found: {fp}")
                        
                    loader = PyPDFLoader(fp) if fp.endswith('.pdf') else TextLoader(fp)
                    docs = loader.load()
                    splits = text_splitter.split_documents(docs)
                    
                    # Add metadata
                    for split in splits:
                        split.metadata.update({
                            "source_type": "file",
                            "source_path": fp
                        })
                    
                    vectorstore.add_documents(splits)
                    print(f"\nAI: Added {len(splits)} chunks from {os.path.basename(fp)}")
                    user_input = user_input.replace(f'RAG_file_path="{fp}"', '').strip()
                except Exception as e:
                    print(f"\nAI: Error processing {fp}: {str(e)}")

            # Process URLs
            urls = re.findall(r'https?://\S+', user_input)
            for url in urls:
                if process_web_content(url):
                    user_input = user_input.replace(url, '').strip()

            # Process remaining input
            user_input = re.sub(r'\s+', ' ', user_input).strip()
            if not user_input:
                continue
                
            response = chat_agent.invoke(
                {"input": user_input},
                {"configurable": {"session_id": SESSION_ID}}
            )
            print(f"\nAI: {response['response']}")
            
        except Exception as e:
            print(f"Error: {str(e)}")
            continue
    
    return chat_agent

defined_llm(OPENAI_API_KEY, chat_prompt=chat_prompt)


 20 bees are sitting on 20 daisies, one bee on each flower. The flowers are arranged in a ring. From time to time 2 bees simultaneously fly  in opposite directions (clockwise and counterclockwise), each to its neighboring flower. Can all bees gather on the same daisy at some point? Will your answer be the same for 19 daisies and 19 bees?

In [None]:


def defined_llm(OPENAI_API_KEY, chat_prompt=chat_prompt):
    SESSION_ID = str(uuid4())
    print(f"Session ID: {SESSION_ID}")
    
    llm = ChatOpenAI(openai_api_key=OPENAI_API_KEY, model="gpt-4-turbo")
    embedding_provider = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,
        chunk_overlap=150,
        separators=[r'\n\s*•', r'\n\d+\.', r'\n\*', '\n\n'],
        add_start_index=True
    )
    
    vectorstore = FAISS.from_texts(
        texts=["Math Knowledge Base Initialized"], 
        embedding=embedding_provider,
        metadatas=[{"source": "system-init"}]
    )
    retriever = vectorstore.as_retriever(
        search_type="mmr",
        search_kwargs={
            "k": 5,
            "score_threshold": 0.7,
            "lambda_mult": 0.5
        }
)

    def format_docs(docs):
        formatted = []
        for i, doc in enumerate(docs):
            # Handle Document objects
            if hasattr(doc, 'metadata'):
                source_type = doc.metadata.get('source_type', 'unknown')
                source = doc.metadata.get('source_path', doc.metadata.get('source_url', 'unknown'))
                content = doc.page_content
            # Handle strings (fallback)
            else:
                source_type = "unknown"
                source = "unknown"
                content = str(doc)
                
            formatted.append(
                f"📚 Source {i+1} ({source_type}): {source}\n"
                f"{content[:500]}..."
            )
        return "\n\n".join(formatted)

    def debug_retrieval(query, retriever, top_k=3):
        docs = retriever.invoke(query)
        print("\n🔍 Retrieved Context Preview:")
        for i, doc in enumerate(docs[:top_k]):
            print(f"\n📄 Document {i+1}:")
            print(f"   Source: {doc.metadata.get('source_url', doc.metadata.get('source_path', 'unknown'))}")
            print(f"   Content: {doc.page_content[:300]}...")
        return docs

    prompt = ChatPromptTemplate.from_messages([
        ("system", chat_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "Context:\n{context}\n\nQuestion: {input}"),
    ])

    # Core processing chain
    rag_chain = (
        RunnablePassthrough.assign(
            context=lambda x: format_docs(x["context"]),
            chat_history=lambda x: x["chat_history"]
        )
        | prompt
        | llm
        | StrOutputParser()
    )

    contextualize_q_system_prompt = """Given a chat history and the latest user question:
    1. DO NOT modify or rephrase the original question
    2. Instead, add any relevant background information from the chat history as a prefix
    3. Format the output as:
       [Background Info (if any)]
       Original Question: <exact original question>"""
    
    contextualize_q_prompt = ChatPromptTemplate.from_messages([
        ("system", contextualize_q_system_prompt),
        MessagesPlaceholder("chat_history"),
        ("human", "{input}"),
    ])

    def process_query(input_text, chat_history):
        standalone_chain = contextualize_q_prompt | llm | StrOutputParser()
        standalone_question = standalone_chain.invoke({
            "chat_history": chat_history,
            "input": input_text
        })
        
        relevant_docs = retriever.invoke(standalone_question)
        context = format_docs(relevant_docs)
        debug_retrieval(standalone_question, retriever)
        
        response = rag_chain.invoke({
            "input": input_text,
            "context": context,
            "chat_history": chat_history
        })
       
        # Self-check verification
        check_prompt = f"""Verify this solution contains:
        1. Make sure the logic is correct. Check the method, formulas, each step.
        2. Try to prove the response is incorrect. If it is incorrect, follow the logic and redo the problem.
        3. Prove the response is correct.

        If any issues are found, provide the correction.
        If no issues are found, respond with "VERIFIED" and briefly explain why.
        
        Solution to verify:
        {response}
        
        Missing/incorrect components:"""
        
        verification = rag_chain.invoke({
            "input": check_prompt,
            "context": context,
            "chat_history": chat_history
        })
        
        # If verification finds issues, get corrected solution
        if "VERIFIED" not in verification:
            corrected = rag_chain.invoke({
                "input": f"""The previous solution had issues. Please provide a corrected solution that addresses these verification issues:{verification}
                Original question: {input_text}""",
                "context": context,
                "chat_history": chat_history
            })
            return f"{response}\n\n**Original Verification Feedback:**\n{verification}\n\n**Corrected Response:**\n{corrected}"
        
        return f"{response}\n\n**Verification:**\n{verification}"

    # Memory
    store = {}
    def get_memory(session_id: str) -> ChatMessageHistory:
        if session_id not in store:
            store[session_id] = ChatMessageHistory(max_messages=20)
        return store[session_id]

    chat_agent = RunnableWithMessageHistory(
        RunnablePassthrough.assign(
            response=lambda x: process_query(x["input"], x["chat_history"])
        ),
        get_memory,
        input_messages_key="input",
        history_messages_key="chat_history",
    )
    
    def process_file(file_path):
        try:
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"File not found: {file_path}")
            
            loader = PyPDFLoader(file_path) if file_path.endswith('.pdf') else TextLoader(file_path)
            docs = loader.load()
            splits = text_splitter.split_documents(docs)
            
            # Add metadata with content type detection
            for split in splits:
                split.metadata.update({
                    "source_type": "file",
                    "source_path": file_path,
                    "content_type": self_detect_content_type(split.page_content)  # Added content type
                })
            
            vectorstore.add_documents(splits)
            print(f"\nAI: Added {len(splits)} chunks from {os.path.basename(file_path)}")
            return True
        except Exception as e:
            print(f"\nAI: Error processing {file_path}: {str(e)}")
            return False
    
    def process_web_content(url):
        try:
            loader = WebBaseLoader(
                web_paths=[url],
                bs_kwargs=dict(
                    parse_only=bs4.SoupStrainer(
                        # Universal content detection
                        ['article', 'main', 'div', 'section', 'content'],
                        class_=lambda value: value and any(
                            kw in value.lower()
                            for kw in ['content', 'article', 'main', 'body', 'text']
                        )
                    )
                )
            )
            docs = loader.load()
            splits = text_splitter.split_documents(docs)
            
            for split in splits:
                split.metadata.update({
                    "source_type": "web",
                    "source_url": url,
                    "content_type": self_detect_content_type(split.page_content)
                })
            
            vectorstore.add_documents(splits)
            print(f"\nAI: Added {len(splits)} chunks from {url}")
            return True
        except Exception as e:
            print(f"\nAI: Error processing {url}: {str(e)}")
            return False

    def self_detect_content_type(text):
        math_keywords = ['theorem', 'formula', 'equation', 'proof', 'lemma']
        if any(kw in text.lower() for kw in math_keywords):
            return "math"
        return "general"

    # Interactive loop
    print("Math Expert System - Type 'exit' to quit")
    print("Input formats:")
    print("- RAG_file_path=\"/path/to/file.pdf\" https://example.com Your question")
    print("- Include math formulas using $...$ notation")
    
    url_pattern = re.compile(
        r'(?:http|ftp)s?://(?:[A-Z0-9-]+\.)+[A-Z]{2,}(?::\d+)?(?:/[\w\-./?%&=]*)?', 
        re.IGNORECASE
    )
    
    while True:
        try:
            user_input = input("\nYou: ").strip()
            if user_input.lower() in ['exit', 'quit']:
                print("Goodbye!")
                break

            # Process file paths
            file_paths = re.findall(r'RAG_file_path="([^"]+)"', user_input)
            for fp in file_paths:
                if process_file(fp):
                    user_input = user_input.replace(f'RAG_file_path="{fp}"', '').strip()

            # Process URLs
            urls = re.findall(r'https?://\S+', user_input)
            for url in urls:
                if process_web_content(url):
                    user_input = user_input.replace(url, '').strip()

            # Process remaining input
            user_input = re.sub(r'\s+', ' ', user_input).strip()
            if not user_input:
                continue
                
            response = chat_agent.invoke(
                {"input": user_input},
                {"configurable": {"session_id": SESSION_ID}}
            )
            print(f"\nAI: {response['response']}")
            
        except Exception as e:
            print(f"Error: {str(e)}")
            continue
    
    return chat_agent

