<a href="https://colab.research.google.com/github/AIDaniel1/AI-Project-/blob/main/AI_agent_pipe_line.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
# ai_agent_pipeline.py
import subprocess
import sys

def install_requirements():
    """Install required packages"""
    packages = [
        "chromadb", "langchain-openai", "openai", "fastapi",
        "uvicorn", "streamlit", "python-dotenv", "requests",
        "sentence-transformers", "nest-asyncio"
    ]

    for package in packages:
        try:
            __import__(package.replace("-", "_"))
            print(f"‚úÖ {package} already installed")
        except ImportError:
            print(f"üì¶ Installing {package}...")
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Install dependencies first
install_requirements()

üì¶ Installing chromadb...
üì¶ Installing langchain-openai...
‚úÖ openai already installed
‚úÖ fastapi already installed
‚úÖ uvicorn already installed
üì¶ Installing streamlit...
üì¶ Installing python-dotenv...
‚úÖ requests already installed
‚úÖ sentence-transformers already installed
‚úÖ nest-asyncio already installed


In [9]:
import chromadb
from chromadb.config import Settings
import uuid
from typing import List, Dict, Any
import os

class VectorDatabase:
    def __init__(self, persist_directory="./chroma_db"):
        try:
            # Create directory if it doesn't exist
            os.makedirs(persist_directory, exist_ok=True)

            self.client = chromadb.PersistentClient(path=persist_directory)
            self.collection = self.client.get_or_create_collection(
                name="knowledge_base",
                metadata={"description": "AI Agent Knowledge Base"}
            )
            print("‚úÖ Vector database initialized successfully")
        except Exception as e:
            print(f"‚ùå Error initializing vector database: {e}")
            raise

    def add_documents(self, documents: List[str], metadatas: List[Dict] = None, ids: List[str] = None):
        """Add documents to the vector database"""
        try:
            if ids is None:
                ids = [str(uuid.uuid4()) for _ in documents]

            if metadatas is None:
                metadatas = [{} for _ in documents]

            self.collection.add(
                documents=documents,
                metadatas=metadatas,
                ids=ids
            )
            print(f"‚úÖ Added {len(documents)} documents to vector database")
        except Exception as e:
            print(f"‚ùå Error adding documents: {e}")
            raise

    def search(self, query: str, n_results: int = 3) -> List[Dict]:
        """Search for similar documents"""
        try:
            results = self.collection.query(
                query_texts=[query],
                n_results=n_results
            )

            formatted_results = []
            for i in range(len(results['documents'][0])):
                formatted_results.append({
                    "document": results['documents'][0][i],
                    "metadata": results['metadatas'][0][i] if results['metadatas'] else {},
                    "distance": results['distances'][0][i] if results['distances'] else 0
                })

            return formatted_results
        except Exception as e:
            print(f"‚ùå Error searching vector database: {e}")
            return []

    def get_all_documents(self) -> Dict[str, Any]:
        """Get all documents from the collection"""
        try:
            return self.collection.get()
        except Exception as e:
            print(f"‚ùå Error getting all documents: {e}")
            return {'ids': [], 'documents': [], 'metadatas': []}

def initialize_sample_data():
    """Initialize with sample data"""
    db = VectorDatabase()

    # Check if collection is empty
    existing_docs = db.get_all_documents()
    if len(existing_docs['ids']) > 0:
        print("‚úÖ Sample data already exists")
        return db

    sample_documents = [
        "Machine learning is a subset of artificial intelligence that enables computers to learn without being explicitly programmed.",
        "Natural Language Processing (NLP) is a field of AI that focuses on the interaction between computers and human language.",
        "Vector databases store data as high-dimensional vectors and enable efficient similarity search.",
        "Large Language Models (LLMs) are AI models trained on vast amounts of text data to understand and generate human-like text.",
        "Retrieval-Augmented Generation (RAG) combines retrieval systems with generative models for more accurate responses.",
        "AI agents are autonomous systems that can perceive, reason, and act to achieve specific goals."
    ]

    sample_metadata = [
        {"category": "ML", "source": "knowledge_base"},
        {"category": "NLP", "source": "knowledge_base"},
        {"category": "Database", "source": "knowledge_base"},
        {"category": "LLM", "source": "knowledge_base"},
        {"category": "RAG", "source": "knowledge_base"},
        {"category": "Agents", "source": "knowledge_base"}
    ]

    db.add_documents(sample_documents, sample_metadata)
    print("‚úÖ Sample data initialized")
    return db

In [3]:
import openai
from openai import OpenAI
import os
from dotenv import load_dotenv
from typing import List, Dict
import requests

load_dotenv()

class LLMIntegration:
    def __init__(self, api_key: str = None, model: str = "gpt-3.5-turbo"):
        try:
            self.api_key = api_key or os.getenv("OPENAI_API_KEY")
            if not self.api_key:
                raise ValueError("OpenAI API key not found. Please set OPENAI_API_KEY environment variable.")

            self.client = OpenAI(api_key=self.api_key)
            self.model = model
            print("‚úÖ LLM integration initialized successfully")
        except Exception as e:
            print(f"‚ùå Error initializing LLM integration: {e}")
            raise

    def generate_response(self, prompt: str, context: str = "", conversation_history: List[Dict] = None) -> str:
        """Generate response using LLM with context"""
        try:
            system_message = """You are an AI assistant with access to a knowledge base.
            Use the provided context to answer questions accurately. If the context doesn't
            contain relevant information, use your general knowledge but indicate this."""

            messages = [{"role": "system", "content": system_message}]

            # Add conversation history if provided
            if conversation_history:
                messages.extend(conversation_history[-6:])  # Keep last 3 exchanges

            # Add context if available
            if context:
                enhanced_prompt = f"Context:\n{context}\n\nQuestion: {prompt}\n\nPlease provide a helpful answer based on the context above."
            else:
                enhanced_prompt = f"Question: {prompt}\n\nPlease provide a helpful answer."

            messages.append({"role": "user", "content": enhanced_prompt})

            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=0.7,
                max_tokens=500
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"I apologize, but I encountered an error: {str(e)}. Please check your API key and try again."

    def format_context(self, search_results: List[Dict]) -> str:
        """Format search results into context string"""
        if not search_results:
            return "No relevant context found."

        context_parts = ["Relevant information from knowledge base:"]
        for i, result in enumerate(search_results, 1):
            context_parts.append(f"{i}. {result['document']}")

        return "\n".join(context_parts)

# Fallback LLM class for testing without API key
class MockLLMIntegration:
    def __init__(self):
        print("‚úÖ Mock LLM integration initialized (for testing)")

    def generate_response(self, prompt: str, context: str = "", conversation_history: List[Dict] = None) -> str:
        return f"This is a mock response for: '{prompt}'. Context provided: {len(context) if context else 0} characters."

    def format_context(self, search_results: List[Dict]) -> str:
        return "Mock context with " + str(len(search_results)) + " search results"

In [10]:
# main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import uvicorn

app = FastAPI(title="AI Agent Pipeline API")

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize components
vector_db = VectorDatabase()
llm = LLMIntegration()
agent = AIAgentPipeline(vector_db, llm)

# Request/Response models
class QueryRequest(BaseModel):
    message: str
    use_rag: bool = True

class QueryResponse(BaseModel):
    response: str
    context_used: str
    search_results: List[Dict]
    conversation_turn: int

class AddKnowledgeRequest(BaseModel):
    documents: List[str]
    metadatas: Optional[List[Dict]] = None

@app.post("/query", response_model=QueryResponse)
async def process_query(request: QueryRequest):
    """Process user query through AI agent pipeline"""
    try:
        result = agent.process_query(request.message, request.use_rag)
        return QueryResponse(**result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/knowledge/add")
async def add_knowledge(request: AddKnowledgeRequest):
    """Add new knowledge to the vector database"""
    try:
        agent.add_knowledge(request.documents, request.metadatas)
        return {"message": "Knowledge added successfully"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/conversation/clear")
async def clear_conversation():
    """Clear conversation history"""
    agent.clear_history()
    return {"message": "Conversation history cleared"}

@app.get("/stats")
async def get_stats():
    """Get pipeline statistics"""
    return agent.get_stats()

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "service": "AI Agent Pipeline API"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

‚úÖ Vector database initialized successfully
‚ùå Error initializing LLM integration: OpenAI API key not found. Please set OPENAI_API_KEY environment variable.


ValueError: OpenAI API key not found. Please set OPENAI_API_KEY environment variable.

In [11]:
# app.py
import streamlit as st
import requests
import json
from typing import List, Dict

# Configuration
API_BASE_URL = "http://localhost:8000"

def initialize_session_state():
    """Initialize session state variables"""
    if "messages" not in st.session_state:
        st.session_state.messages = []
    if "use_rag" not in st.session_state:
        st.session_state.use_rag = True

def call_api(endpoint: str, method: str = "GET", data: Dict = None):
    """Make API call to backend"""
    url = f"{API_BASE_URL}{endpoint}"

    try:
        if method == "GET":
            response = requests.get(url)
        elif method == "POST":
            response = requests.post(url, json=data)

        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        st.error(f"API Error: {str(e)}")
        return None

def main():
    st.set_page_config(
        page_title="AI Agent Pipeline",
        page_icon="ü§ñ",
        layout="wide"
    )

    st.title("ü§ñ AI Agent Pipeline")
    st.markdown("Chat with your AI agent powered by Vector Database and LLM")

    initialize_session_state()

    # Sidebar for configuration
    with st.sidebar:
        st.header("Configuration")

        # RAG toggle
        st.session_state.use_rag = st.toggle(
            "Use RAG (Retrieval-Augmented Generation)",
            value=st.session_state.use_rag
        )

        # Add knowledge section
        st.subheader("Add Knowledge")
        with st.form("add_knowledge"):
            new_doc = st.text_area("New Document")
            doc_category = st.text_input("Category")

            if st.form_submit_button("Add to Knowledge Base"):
                if new_doc:
                    metadata = {"category": doc_category} if doc_category else {}
                    result = call_api(
                        "/knowledge/add",
                        "POST",
                        {"documents": [new_doc], "metadatas": [metadata]}
                    )
                    if result:
                        st.success("Document added to knowledge base!")

        # Stats and controls
        st.subheader("System Info")
        if st.button("Get Statistics"):
            stats = call_api("/stats")
            if stats:
                st.json(stats)

        if st.button("Clear Conversation"):
            call_api("/conversation/clear", "POST")
            st.session_state.messages = []
            st.rerun()

    # Main chat interface
    col1, col2 = st.columns([3, 1])

    with col1:
        # Display chat messages
        for message in st.session_state.messages:
            with st.chat_message(message["role"]):
                st.markdown(message["content"])

                # Show context if available and RAG is enabled
                if message.get("context") and st.session_state.use_rag:
                    with st.expander("üìö Context Used"):
                        st.text(message["context"])

    with col2:
        st.subheader("Pipeline Flow")
        st.markdown("""
        ```
        User Query
            ‚Üì
        Vector DB Search
            ‚Üì
        Context + Query
            ‚Üì
        LLM Processing
            ‚Üì
        Response
        ```
        """)

        if st.session_state.messages:
            latest_message = st.session_state.messages[-1]
            if latest_message.get("search_results"):
                st.subheader("Search Results")
                for i, result in enumerate(latest_message["search_results"][:3]):
                    with st.expander(f"Result {i+1}"):
                        st.text(result["document"][:100] + "...")

    # Chat input
    if prompt := st.chat_input("What would you like to know?"):
        # Add user message to chat history
        st.session_state.messages.append({"role": "user", "content": prompt})
        with st.chat_message("user"):
            st.markdown(prompt)

        # Get AI response
        with st.chat_message("assistant"):
            with st.spinner("Thinking..."):
                result = call_api(
                    "/query",
                    "POST",
                    {"message": prompt, "use_rag": st.session_state.use_rag}
                )

                if result:
                    response = result["response"]
                    st.markdown(response)

                    # Add assistant response with context to history
                    st.session_state.messages.append({
                        "role": "assistant",
                        "content": response,
                        "context": result.get("context_used", ""),
                        "search_results": result.get("search_results", [])
                    })
                else:
                    st.error("Failed to get response from AI agent")

if __name__ == "__main__":
    main()

2025-11-08 23:30:10.813 
  command:

    streamlit run /usr/local/lib/python3.12/dist-packages/colab_kernel_launcher.py [ARGUMENTS]
2025-11-08 23:30:10.819 Session state does not function when running a script without `streamlit run`
