### Install the dependancies

In [1]:
!pip install gradio langchain langgraph pydantic transformers chromadb sentence-transformers

Collecting gradio
  Downloading gradio-5.23.1-py3-none-any.whl.metadata (16 kB)
Collecting langgraph
  Downloading langgraph-0.3.20-py3-none-any.whl.metadata (7.7 kB)
Collecting chromadb
  Downloading chromadb-0.6.3-py3-none-any.whl.metadata (6.8 kB)
Collecting aiofiles<24.0,>=22.0 (from gradio)
  Downloading aiofiles-23.2.1-py3-none-any.whl.metadata (9.7 kB)
Collecting fastapi<1.0,>=0.115.2 (from gradio)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting ffmpy (from gradio)
  Downloading ffmpy-0.5.0-py3-none-any.whl.metadata (3.0 kB)
Collecting gradio-client==1.8.0 (from gradio)
  Downloading gradio_client-1.8.0-py3-none-any.whl.metadata (7.1 kB)
Collecting groovy~=0.1 (from gradio)
  Downloading groovy-0.1.2-py3-none-any.whl.metadata (6.1 kB)
Collecting pydub (from gradio)
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting python-multipart>=0.0.18 (from gradio)
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)


In [2]:
!pip install -q gradio langchain langchain-community langgraph>=0.0.20 transformers torch sentence-transformers chromadb networkx

### Create directory for source files

In [3]:
import requests
import zipfile
import io
import os

# Create inline source files for the system components

# Create directory structure
!mkdir -p agentic_rag-mcp_system

In [4]:
# Change to that directory
%cd agentic_rag-mcp_system

/content/agentic_rag-mcp_system


### Model Context Protocol Implementation Block

In [5]:
%%writefile full_mcp_implementation.py
import os
import logging
import re
import json
from typing import Dict, List, Any, Optional, Union
from abc import ABC, abstractmethod
from pathlib import Path

# Configure MCP logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_system")

# Define MCP Framework components
# The below code block define Specialized Agents for each task : Device Search, Troubleshooting, Observability, Knowledge Base and Incident Response

class AgentContext:
    """Context container for sharing information between agents"""

    def __init__(self):
        self.conversation_history = []
        self.retrieved_knowledge = {}
        self.entity_memory = {}
        self.execution_state = {}
        self.last_agent = None

    def add_message(self, role: str, content: str):
        """Add a message to the conversation history"""
        self.conversation_history.append({"role": role, "content": content})

    def add_knowledge(self, agent_type: str, knowledge: Dict[str, Any]):
        """Add retrieved knowledge for a specific agent type"""
        self.retrieved_knowledge[agent_type] = knowledge

    def remember_entity(self, entity_type: str, entity_id: str, properties: Dict[str, Any]):
        """Remember an entity like a device or service"""
        if entity_type not in self.entity_memory:
            self.entity_memory[entity_type] = {}
        self.entity_memory[entity_type][entity_id] = properties

    def get_conversation_summary(self, last_n=5):
        """Get a summary of the last n conversation turns"""
        return self.conversation_history[-last_n:] if self.conversation_history else []

    def update_state(self, key: str, value: Any):
        """Update execution state"""
        self.execution_state[key] = value

    def get_state(self, key: str, default=None):
        """Get value from execution state"""
        return self.execution_state.get(key, default)

    def set_last_agent(self, agent_type: str):
        """Set the last agent that processed a request"""
        self.last_agent = agent_type

    def get_last_agent(self):
        """Get the last agent that processed a request"""
        return self.last_agent

class BaseAgent(ABC):
    """Base class for all MCP agents"""

    def __init__(self, agent_id: str, agent_type: str, capabilities: List[str]):
        self.agent_id = agent_id
        self.agent_type = agent_type
        self.capabilities = capabilities
        self.rag_system = None

    def set_rag_system(self, rag_system):
        """Set the RAG system to use for knowledge retrieval"""
        self.rag_system = rag_system

    @abstractmethod
    def process(self, query: str, context: AgentContext) -> Dict[str, Any]:
        """Process a query and return a response"""
        pass

    def can_handle(self, capability: str) -> bool:
        """Check if this agent can handle a specific capability"""
        return capability in self.capabilities

    def get_metadata(self) -> Dict[str, Any]:
        """Get agent metadata"""
        return {
            "agent_id": self.agent_id,
            "agent_type": self.agent_type,
            "capabilities": self.capabilities
        }

class TroubleshootingAgent(BaseAgent):
    """Agent specialized in network troubleshooting"""

    def __init__(self, agent_id: str):
        capabilities = [
            "diagnose_network_issues",
            "interpret_error_messages",
            "suggest_resolution_steps"
        ]
        super().__init__(agent_id, "troubleshooting", capabilities)

    def process(self, query: str, context: AgentContext) -> Dict[str, Any]:
        """Process a troubleshooting query"""
        logger.info(f"TroubleshootingAgent processing query: {query}")

        # Use RAG system to retrieve relevant knowledge
        if self.rag_system:
            rag_result = self.rag_system.query("troubleshooting", query)
            context.add_knowledge("troubleshooting", rag_result)

            # Remember any devices mentioned in the query
            if "device" in context.entity_memory:
                for device_id, device in context.entity_memory["device"].items():
                    if device_id.lower() in query.lower() or device.get("name", "").lower() in query.lower():
                        logger.info(f"Found referenced device in query: {device_id}")
                        context.update_state("referenced_device", device_id)

            # Create response
            response = {
                "analysis": rag_result["response"],
                "sources": rag_result["sources"],
                "suggested_actions": self._extract_actions(rag_result["response"])
            }

            # Update context
            context.add_message("agent", json.dumps(response))
            context.set_last_agent("troubleshooting")

            return response
        else:
            return {"error": "RAG system not available"}

    def _extract_actions(self, text: str) -> List[str]:
        """Extract suggested actions from the response text"""
        actions = []

        # Simple extraction - look for numbered lists and bullets
        lines = text.split('\n')
        for line in lines:
            line = line.strip()
            # Match patterns like "1. Check X" or "- Verify Y"
            if (line.startswith('- ') or line.startswith('* ') or
                (len(line) > 2 and line[0].isdigit() and line[1] == '.')):
                action = line[2:].strip() if line[1] in ['.', ' '] else line[1:].strip()
                actions.append(action)

        return actions

class DeviceSearchAgent(BaseAgent):
    """Agent specialized in finding network devices"""

    def __init__(self, agent_id: str, langgraph_agent=None):
        capabilities = [
            "find_devices",
            "analyze_topology",
            "assess_impact"
        ]
        super().__init__(agent_id, "device_search", capabilities)
        # Store reference to LangGraph agent if provided
        self.langgraph_agent = langgraph_agent

    def process(self, query: str, context: AgentContext) -> Dict[str, Any]:
        """Process a device search query"""
        logger.info(f"DeviceSearchAgent processing query: {query}")

        # If LangGraph agent is available, use it
        if self.langgraph_agent:
            try:
                logger.info("Using LangGraph agent for device search")
                langgraph_result = self.langgraph_agent(query)

                if not langgraph_result.get("success", False):
                    error_msg = langgraph_result.get("error", "Unknown error")
                    logger.error(f"LangGraph agent error: {error_msg}")
                    return {"error": error_msg}

                # Extract found devices
                found_devices = langgraph_result.get("found_devices", [])

                # Remember devices in context
                for device in found_devices:
                    device_id = device.get("ci_id", "")
                    if device_id:
                        context.remember_entity("device", device_id, {
                            "id": device_id,
                            "name": device.get("name", ""),
                            "type": device.get("ci_type", ""),
                            "status": device.get("status", ""),
                            "location": device.get("location", ""),
                            "importance": device.get("importance", "")
                        })

                # Create response using LangGraph results
                response = {
                    "devices": found_devices,
                    "upstream_devices": langgraph_result.get("upstream_devices", {}),
                    "downstream_devices": langgraph_result.get("downstream_devices", {}),
                    "affected_services": langgraph_result.get("affected_services", {})
                }

                # Update context
                context.add_message("agent", json.dumps(response))
                context.set_last_agent("device_search")

                return response

            except Exception as e:
                logger.error(f"Error using LangGraph agent: {e}")
                # Fall back to RAG-based approach if LangGraph fails

        # Use RAG system as fallback or primary approach
        if self.rag_system:
            # First, get general device knowledge
            rag_result = self.rag_system.query("device_search", query)
            context.add_knowledge("device_search", rag_result)

            # Try to find specific devices in inventory if available
            device_inventory = None
            try:
                inventory_result = self.rag_system.query("device_inventory", query)
                if inventory_result and "retrieved_content" in inventory_result:
                    device_inventory = inventory_result
                    context.add_knowledge("device_inventory", inventory_result)
            except Exception as e:
                logger.warning(f"Error retrieving device inventory: {e}")

            # Extract and remember devices
            devices = self._extract_devices(rag_result["response"], device_inventory)
            for device in devices:
                context.remember_entity("device", device["id"], device)

            # Create response
            response = {
                "devices": devices,
                "topology_analysis": self._analyze_topology(devices, context),
                "sources": rag_result["sources"]
            }

            # Update context
            context.add_message("agent", json.dumps(response))
            context.set_last_agent("device_search")

            return response
        else:
            return {"error": "Neither LangGraph nor RAG system available"}

    def _extract_devices(self, text: str, inventory=None) -> List[Dict[str, Any]]:
        """Extract device information from response text and/or inventory"""
        devices = []

        # First try to extract from inventory if available
        if inventory and "response" in inventory:
            try:
                # Look for patterns like "Device ID: xxx " or similar structured datausing regex
                import re
                device_pattern = r"Device ID: (\w+)[\s\n]+Name: ([^\n]+)[\s\n]+Type: ([^\n]+)[\s\n]+Status: ([^\n]+)"
                matches = re.findall(device_pattern, inventory["response"], re.IGNORECASE)

                for match in matches:
                    device_id, name, device_type, status = match
                    devices.append({
                        "id": device_id.strip(),
                        "name": name.strip(),
                        "type": device_type.strip(),
                        "status": status.strip()
                    })
            except Exception as e:
                logger.warning(f"Error extracting devices from inventory: {e}")

        # If no devices extracted from inventory, try from general response
        if not devices:
            # Simple extraction - look for device mentions
            lines = text.split('\n')
            current_device = {}

            for line in lines:
                line = line.strip()

                # Try to identify device ID or name
                if ":" in line:
                    key, value = line.split(":", 1)
                    key = key.strip().lower()
                    value = value.strip()

                    if key in ["device", "device id", "id", "name"]:
                        # If we were tracking a device, save it before starting a new one
                        if current_device and "id" in current_device:
                            devices.append(current_device)
                            current_device = {}

                        if key in ["device id", "id"]:
                            current_device["id"] = value
                        else:  # name
                            current_device["name"] = value
                            if "id" not in current_device:
                                # Generate an ID if none exists
                                current_device["id"] = f"DEV{len(devices):03d}"

                    # Capture other device attributes
                    elif key in ["type", "status", "location", "importance"] and current_device:
                        current_device[key] = value

            # Add the last device for tracking purposes
            if current_device and "id" in current_device:
                devices.append(current_device)

        return devices

    def _analyze_topology(self, devices: List[Dict[str, Any]], context: AgentContext) -> Dict[str, Any]:
        """Analyze topology relationships between devices"""
        # Code block to draw on knowledge about network topology
        # For this example, we'll use a simplified approach

        topology = {
            "connections": [],
            "dependencies": [],
            "critical_paths": []
        }

        # Identify potential connections based on device types
        for i, device1 in enumerate(devices):
            for j, device2 in enumerate(devices):
                if i == j:
                    continue

                # Simple topology rules
                if device1.get("type", "").lower() == "router" and device2.get("type", "").lower() == "switch":
                    topology["connections"].append({
                        "from": device1["id"],
                        "to": device2["id"],
                        "type": "uplink"
                    })
                    topology["dependencies"].append({
                        "dependent": device2["id"],
                        "depends_on": device1["id"],
                        "reason": "Routing dependency"
                    })

        # Check for critical paths
        for conn in topology["connections"]:
            from_device = next((d for d in devices if d["id"] == conn["from"]), None)
            if from_device and from_device.get("importance", "").lower() in ["critical", "high"]:
                topology["critical_paths"].append({
                    "path_id": f"PATH{len(topology['critical_paths']):03d}",
                    "devices": [conn["from"], conn["to"]],
                    "importance": from_device.get("importance", "unknown")
                })

        return topology

class KnowledgeBaseAgent(BaseAgent):
    """Agent specialized in retrieving knowledge base information"""

    def __init__(self, agent_id: str):
        capabilities = [
            "answer_questions",
            "provide_references",
            "explain_concepts"
        ]
        super().__init__(agent_id, "knowledge_base", capabilities)

    def process(self, query: str, context: AgentContext) -> Dict[str, Any]:
        """Process a knowledge base query"""
        logger.info(f"KnowledgeBaseAgent processing query: {query}")

        # Parse document type if specified
        doc_type = None
        if "document type:" in query.lower():
            parts = query.split("document type:", 1)
            query_text = parts[0].strip()
            doc_type = parts[1].strip()
            logger.info(f"Detected document type filter: {doc_type}")
        else:
            query_text = query

        # Enrich query with context from conversation
        enriched_query = self._enrich_query(query_text, context)
        if doc_type:
            enriched_query = f"{enriched_query} (document type: {doc_type})"

        # Use RAG system to retrieve knowledge
        if self.rag_system:
            rag_result = self.rag_system.query("knowledge_base", enriched_query)
            context.add_knowledge("knowledge_base", rag_result)

            # Create response
            response = {
                "answer": rag_result["response"],
                "sources": rag_result["sources"],
                "related_topics": self._extract_related_topics(rag_result["response"], rag_result.get("retrieved_content", []))
            }

            # Update context
            context.add_message("agent", json.dumps(response))
            context.set_last_agent("knowledge_base")

            return response
        else:
            return {"error": "RAG system not available"}

    def _enrich_query(self, query: str, context: AgentContext) -> str:
        """Enrich the query with conversational context"""
        enriched = query

        # Add device context if available
        referenced_device_id = context.get_state("referenced_device")
        if referenced_device_id and "device" in context.entity_memory:
            device = context.entity_memory["device"].get(referenced_device_id)
            if device:
                device_info = f" (regarding {device.get('type', 'device')} {device.get('name', referenced_device_id)})"
                enriched += device_info

        # Add context from last few conversation rounds
        last_msgs = context.get_conversation_summary(last_n=2)
        if last_msgs:
            # Extract key information from recent messages
            recent_context = " ".join([msg["content"] for msg in last_msgs if msg["role"] == "user"])
            if recent_context and len(recent_context) > 0:
                enriched = f"{enriched} (context: {recent_context[:100]}...)"

        return enriched

    def _extract_related_topics(self, response: str, content: List[str]) -> List[str]:
        """Extract related topics from the response and retrieved content"""
        topics = set()

        # Look for common technical terms
        tech_terms = [
            "router", "switch", "firewall", "VPN", "ACL", "QoS", "VLAN",
            "routing", "switching", "security", "performance", "latency",
            "DNS", "DHCP", "BGP", "OSPF", "spanning tree", "NAT"
        ]

        # Extract terms from response
        for term in tech_terms:
            if term.lower() in response.lower():
                topics.add(term)

        # Extract from retrieved content
        for doc in content:
            for term in tech_terms:
                if term.lower() in doc.lower() and term not in topics:
                    # Only add a few more to avoid overload
                    if len(topics) < 10:
                        topics.add(term)

        return list(topics)

class ObservabilityAgent(BaseAgent):
    """Agent specialized in network monitoring and observability"""

    def __init__(self, agent_id: str):
        capabilities = [
            "analyze_metrics",
            "detect_anomalies",
            "forecast_trends",
            "health_assessment"
        ]
        super().__init__(agent_id, "observability", capabilities)

    def process(self, query: str, context: AgentContext) -> Dict[str, Any]:
        """Process an observability query"""
        logger.info(f"ObservabilityAgent processing query: {query}")

        # Parse metric parameters from query
        ci_types, metrics, time_range = self._parse_metrics_query(query, context)
        logger.info(f"Parsed metrics query: CI Types={ci_types}, Metrics={metrics}, Time Range={time_range}")

        # Use RAG system to retrieve knowledge
        if self.rag_system:
            formatted_query = f"I need to analyze metrics for CI types: {ci_types}, focusing on these metrics: {metrics}, over time range: {time_range}"
            rag_result = self.rag_system.query("observability", formatted_query)
            context.add_knowledge("observability", rag_result)

            # Create response
            response = {
                "assessment": rag_result["response"],
                "sources": rag_result["sources"],
                "parameters": {
                    "ci_types": ci_types,
                    "metrics": metrics,
                    "time_range": time_range
                }
            }

            # Update context - remember analyzed CI types for future reference
            for ci_type in ci_types.split(","):
                ci_type = ci_type.strip()
                if ci_type:
                    context.update_state(f"analyzed_{ci_type}", True)

            context.add_message("agent", json.dumps(response))
            context.set_last_agent("observability")

            return response
        else:
            return {"error": "RAG system not available"}

    def _parse_metrics_query(self, query: str, context: AgentContext) -> tuple:
        """Parse CI types, metrics, and time range from query"""
        # Default values
        default_ci_types = "router, switch"
        default_metrics = "cpu_utilization, latency, memory_utilization"
        default_time_range = "last_24h"

        ci_types = default_ci_types
        metrics = default_metrics
        time_range = default_time_range

        # Try to extract from the query
        query_lower = query.lower()

        # Extract CI types
        if "ci types:" in query_lower or "ci types :" in query_lower:
            parts = re.split(r"ci types\s*:", query_lower, 1)
            if len(parts) > 1:
                ci_part = parts[1].split(",", 1)[0].strip()
                if ci_part:
                    ci_types = ci_part
        elif "devices:" in query_lower or "devices :" in query_lower:
            parts = re.split(r"devices\s*:", query_lower, 1)
            if len(parts) > 1:
                ci_part = parts[1].split(",", 1)[0].strip()
                if ci_part:
                    ci_types = ci_part

        # Extract metrics
        if "metrics:" in query_lower or "metrics :" in query_lower:
            parts = re.split(r"metrics\s*:", query_lower, 1)
            if len(parts) > 1:
                metrics_part = parts[1].split(",", 1)[0].strip()
                if metrics_part:
                    metrics = metrics_part

        # Extract time range
        time_ranges = ["last_1h", "last_6h", "last_12h", "last_24h", "last_3d", "last_7d"]
        for tr in time_ranges:
            if tr in query_lower:
                time_range = tr
                break

        # If devices were previously found in context, use them
        if "device" in context.entity_memory and not "ci types:" in query_lower:
            device_types = set()
            for device_id, device in context.entity_memory["device"].items():
                if "type" in device:
                    device_types.add(device["type"].lower())
            if device_types:
                ci_types = ", ".join(device_types)

        return ci_types, metrics, time_range

class IncidentResolutionAgent(BaseAgent):
    """Agent specialized in incident management"""

    def __init__(self, agent_id: str):
        capabilities = [
            "incident_analysis",
            "resolution_guidance",
            "impact_assessment",
            "root_cause_analysis"
        ]
        super().__init__(agent_id, "incident_resolution", capabilities)

    def process(self, query: str, context: AgentContext) -> Dict[str, Any]:
        """Process an incident resolution query"""
        logger.info(f"IncidentResolutionAgent processing query: {query}")

        # Parse incident parameters
        incident_id, title, description, status, priority, affected_cis = self._parse_incident_query(query)

        # Use information from context to enhance query if needed
        if affected_cis == "" and "device" in context.entity_memory:
            # Use devices from memory if no CIs specified
            affected_devices = []
            for device_id, device in context.entity_memory["device"].items():
                affected_devices.append(device.get("name", device_id))
            if affected_devices:
                affected_cis = ", ".join(affected_devices)

        # Use RAG system to retrieve knowledge
        if self.rag_system:
            formatted_query = f"Incident ID: {incident_id}\nTitle: {title}\nDescription: {description}\nStatus: {status}\nPriority: {priority}\nAffected CIs: {affected_cis}"
            rag_result = self.rag_system.query("incident_resolution", formatted_query)
            context.add_knowledge("incident_resolution", rag_result)

            # Create response
            response = {
                "summary": rag_result["response"],
                "sources": rag_result["sources"],
                "incident_details": {
                    "id": incident_id,
                    "title": title,
                    "status": status,
                    "priority": priority,
                    "affected_cis": affected_cis.split(", ") if affected_cis else []
                }
            }

            # Extract action items if any
            action_items = self._extract_action_items(rag_result["response"])
            if action_items:
                response["action_items"] = action_items

            # Update context
            context.add_message("agent", json.dumps(response))
            context.set_last_agent("incident_resolution")

            # Remember this incident in context
            context.remember_entity("incident", incident_id, {
                "id": incident_id,
                "title": title,
                "status": status,
                "priority": priority
            })

            return response
        else:
            return {"error": "RAG system not available"}

    def _parse_incident_query(self, query: str) -> tuple:
        """Parse incident details from query"""
        # Default values
        incident_id = "INC-001"
        title = "Network Issue"
        description = ""
        status = "open"
        priority = "medium"
        affected_cis = ""

        # Try to extract from the query
        lines = query.split('\n')
        for line in lines:
            line = line.strip()
            if ":" in line:
                key, value = line.split(":", 1)
                key = key.strip().lower()
                value = value.strip()

                if "id" in key:
                    incident_id = value
                elif "title" in key:
                    title = value
                elif "description" in key:
                    description = value
                elif "status" in key:
                    status = value.lower()
                elif "priority" in key:
                    priority = value.lower()
                elif "affected" in key and "ci" in key:
                    affected_cis = value

        return incident_id, title, description, status, priority, affected_cis

    def _extract_action_items(self, text: str) -> List[Dict[str, str]]:
        """Extract action items from incident resolution text"""
        action_items = []

        # Look for sections that indicate actions
        sections = [
            "next steps", "action items", "recommendations",
            "required actions", "follow-up"
        ]

        lines = text.split('\n')
        in_action_section = False

        for i, line in enumerate(lines):
            line = line.strip().lower()

            # Check if we're entering an action section
            for section in sections:
                if section in line and ":" in line:
                    in_action_section = True
                    break

            # If in action section, look for numbered or bulleted items
            if in_action_section:
                orig_line = lines[i].strip()
                if (orig_line.startswith('- ') or orig_line.startswith('* ') or
                    (len(orig_line) > 2 and orig_line[0].isdigit() and orig_line[1] == '.')):

                    action = orig_line[2:].strip() if orig_line[1] in ['.', ' '] else orig_line[1:].strip()

                    # Try to identify owner and deadline if present
                    owner = "Unassigned"
                    deadline = "Not specified"

                    if "owner:" in action.lower() or "assigned to:" in action.lower():
                        parts = re.split(r"owner:|assigned to:", action.lower(), 1)
                        if len(parts) > 1:
                            potential_owner = parts[1].split(",", 1)[0].strip()
                            if potential_owner:
                                owner = potential_owner
                                action = parts[0].strip()

                    if "by:" in action.lower() or "deadline:" in action.lower() or "due:" in action.lower():
                        deadline_patterns = [r"by:", r"deadline:", r"due:"]
                        for pattern in deadline_patterns:
                            if re.search(pattern, action.lower()):
                                parts = re.split(pattern, action.lower(), 1)
                                if len(parts) > 1:
                                    potential_deadline = parts[1].split(",", 1)[0].strip()
                                    if potential_deadline:
                                        deadline = potential_deadline
                                        action = parts[0].strip()

                    action_items.append({
                        "action": action,
                        "owner": owner,
                        "deadline": deadline
                    })

        return action_items

class MCPRegistry:
    """Central registry for MCP agents"""

    def __init__(self):
        self.agents = {}
        self.rag_system = None
        self.default_context = AgentContext()

    def register_agent(self, agent: BaseAgent):
        """Register an agent in the system"""
        self.agents[agent.agent_id] = agent
        if self.rag_system:
            agent.set_rag_system(self.rag_system)
        logger.info(f"Registered agent: {agent.agent_id} ({agent.agent_type})")

    def set_rag_system(self, rag_system):
        """Set the RAG system for all agents"""
        self.rag_system = rag_system
        for agent in self.agents.values():
            agent.set_rag_system(rag_system)
        logger.info("RAG system configured for all agents")

    def get_agent(self, agent_id: str) -> Optional[BaseAgent]:
        """Get an agent by ID"""
        return self.agents.get(agent_id)

    def get_agent_by_type(self, agent_type: str) -> Optional[BaseAgent]:
        """Get the first agent of a specific type"""
        for agent in self.agents.values():
            if agent.agent_type == agent_type:
                return agent
        return None

    def find_agent_for_capability(self, capability: str) -> Optional[BaseAgent]:
        """Find an agent that can handle a specific capability"""
        for agent in self.agents.values():
            if agent.can_handle(capability):
                return agent
        return None

    def find_agent_for_query(self, query: str) -> Optional[BaseAgent]:
        """Find the most appropriate agent for a query"""
        # Simple keyword-based routing
        query_lower = query.lower()

        # Define keywords for each agent type
        routing_map = {
            "troubleshooting": ["problem", "issue", "error", "not working", "troubleshoot", "fix", "broken"],
            "device_search": ["find", "search", "device", "router", "switch", "firewall", "topology"],
            "knowledge_base": ["what is", "how to", "explain", "documentation", "best practice"],
            "observability": ["monitor", "metrics", "performance", "trend", "utilization", "health"],
            "incident_resolution": ["incident", "outage", "resolution", "root cause", "impact"]
        }

        # Score each agent type based on keyword matches
        scores = {agent_type: 0 for agent_type in routing_map}
        for agent_type, keywords in routing_map.items():
            for keyword in keywords:
                if keyword in query_lower:
                    scores[agent_type] += 1

        # Check for specific formats that clearly indicate agent type
        if "incident id:" in query_lower:
            scores["incident_resolution"] += 10
        elif "ci types:" in query_lower or "metrics:" in query_lower:
            scores["observability"] += 10
        elif "document type:" in query_lower:
            scores["knowledge_base"] += 10

        # Get the agent type with the highest score
        if any(scores.values()):
            best_agent_type = max(scores, key=scores.get)
            logger.info(f"Query routed to {best_agent_type} agent (score: {scores[best_agent_type]})")

            # Find an agent of this type
            for agent in self.agents.values():
                if agent.agent_type == best_agent_type:
                    return agent

        # If no agent scores high enough to handle the query, the system falls back to using the Knowledge Base Agent
        logger.info("No specific agent match, falling back to knowledge base agent")
        for agent in self.agents.values():
            if agent.agent_type == "knowledge_base":
                return agent

        # Last resort: return the first agent /If for some reason the Knowledge Base Agent isn't available, the system will use the first agent in the registry
        return next(iter(self.agents.values())) if self.agents else None

    def process_query(self, query: str, agent_type: str = None, context: Optional[AgentContext] = None) -> Dict[str, Any]:
        """Process a query with the most appropriate or specified agent"""
        if context is None:
            context = self.default_context

        # Find the appropriate agent
        agent = None
        if agent_type:
            # If agent type is specified, use it directly
            agent = self.get_agent_by_type(agent_type)

        if not agent:
            # Otherwise find the most appropriate agent
            agent = self.find_agent_for_query(query)

        if not agent:
            return {"error": "No suitable agent found"}

        # Update context with the user query
        context.add_message("user", query)

        # Process the query
        result = agent.process(query, context)

        # Enrich result with agent information
        result["agent_id"] = agent.agent_id
        result["agent_type"] = agent.agent_type

        return result

# Factory function to create a complete MCP system
def create_mcp_system(rag_system, langgraph_agent=None):
    """Create and initialize a complete MCP system"""
    # Create registry
    registry = MCPRegistry()

    # Create all agents
    troubleshooting_agent = TroubleshootingAgent("Troubleshoot-IDP")
    device_search_agent = DeviceSearchAgent("Device-search-IDP", langgraph_agent)
    knowledge_agent = KnowledgeBaseAgent("KnowledgeBase-IDP")
    observability_agent = ObservabilityAgent("Observability-IDP")
    incident_agent = IncidentResolutionAgent("Incident-IDP")

    # Register agents
    registry.register_agent(troubleshooting_agent)
    registry.register_agent(device_search_agent)
    registry.register_agent(knowledge_agent)
    registry.register_agent(observability_agent)
    registry.register_agent(incident_agent)

    # Set RAG system
    registry.set_rag_system(rag_system)

    return registry


Writing full_mcp_implementation.py


### LangGraph Enabled Device Search provide
- Structured Workflow Management - Edge/Node definition
- Query Parsing - Device Search - Topology Analysis for device relationships -Result Formatting

In [6]:
%%writefile langgraph_device_search_agent.py
import os
import json
import logging
from typing import Dict, List, Any, Optional, Union
from copy import deepcopy

# Import required libraries
from pydantic import BaseModel, Field
from langchain.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END
from langchain_community.llms import HuggingFacePipeline

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("LangGraphDeviceSearchAgent")

# Define the device search state with improved type handling
class DeviceSearchState(BaseModel):
    """State for the device search workflow"""
    # Accept both dict and string types for query
    query: Union[Dict[str, Any], str, None] = Field(default_factory=dict)
    search_criteria: Dict[str, Any] = Field(default_factory=dict)
    found_devices: List[Dict[str, Any]] = Field(default_factory=list)
    upstream_devices: Dict[str, List[Dict[str, Any]]] = Field(default_factory=dict)
    downstream_devices: Dict[str, List[Dict[str, Any]]] = Field(default_factory=dict)
    affected_services: Dict[str, List[Dict[str, Any]]] = Field(default_factory=dict)
    error: Optional[str] = None
    status: str = "initialized"
    rag_context: Optional[str] = None

class LangGraphDeviceSearchAgent:
    """LangGraph-based Device Search Agent integrated with RAG"""

    def __init__(self, llm, rag_system=None):
        """Initialize the agent with LLM and RAG system"""
        self.llm = llm
        self.rag_system = rag_system
        self.graph = self._build_graph()
        logger.info("LangGraph Device Search Agent initialized")

    def _build_graph(self):
        """Build the LangGraph workflow"""
        # Create the graph with the state
        workflow = StateGraph(DeviceSearchState)

        # Add nodes for each step
        workflow.add_node("parse_query", self.parse_query)
        workflow.add_node("search_devices", self.search_devices)
        workflow.add_node("analyze_topology", self.analyze_topology)
        workflow.add_node("format_results", self.format_results)

        # Add edges to define the flow
        workflow.add_edge("parse_query", "search_devices")
        workflow.add_edge("search_devices", "analyze_topology")
        workflow.add_edge("analyze_topology", "format_results")
        workflow.add_edge("format_results", END)

        # Add conditional edges for error handling
        workflow.add_conditional_edges(
            "parse_query",
            lambda state: "search_devices" if state.error is None else END
        )

        workflow.add_conditional_edges(
            "search_devices",
            lambda state: "analyze_topology" if state.error is None else END
        )

        # Set the entry point
        workflow.set_entry_point("parse_query")

        # Compile the graph
        return workflow.compile()

    def parse_query(self, state: DeviceSearchState) -> DeviceSearchState:
        """Parse query to extract search criteria with improved string handling"""
        logger.info("Parsing query")
        new_state = deepcopy(state)

        try:
            # Handle different query types
            if isinstance(state.query, str):
                logger.info(f"Converting string query to dict: {state.query}")
                new_state.search_criteria = {"description": state.query}
            elif isinstance(state.query, dict):
                # If already a dict, use it directly
                new_state.search_criteria = state.query
            elif state.query is None:
                # Handle None by creating an empty dict
                new_state.search_criteria = {}
            else:
                # Handle any other type
                new_state.search_criteria = {"raw_value": str(state.query)}

            return new_state

        except Exception as e:
            logger.error(f"Error in parse_query: {e}")
            new_state.error = f"Error parsing query: {str(e)}"
            return new_state

    def search_devices(self, state: DeviceSearchState) -> DeviceSearchState:
        """Search for devices matching criteria with case-insensitive field handling"""
        logger.info("Searching devices")
        new_state = deepcopy(state)

        try:
            # Extract query text for RAG and tracking
            query_text = ""
            if isinstance(state.query, str):
                query_text = state.query
            elif isinstance(state.query, dict) and "description" in state.query:
                query_text = state.query["description"]
            else:
                query_text = json.dumps(state.search_criteria)

            # Get RAG context if available
            context = "Use your knowledge of network topologies."
            if self.rag_system:
                try:
                    # Query the RAG system
                    rag_result = self.rag_system.query("device_search", query_text)
                    if rag_result and "response" in rag_result:
                        # Make the RAG response safe for string formatting
                        rag_response = rag_result['response']
                        safe_response = rag_response.replace("{", "{{").replace("}", "}}")

                        context = f"Use this information from our knowledge base:\n{safe_response}"
                        new_state.rag_context = context

                        logger.info(f"Retrieved {len(rag_result.get('sources', []))} knowledge base entries")
                except Exception as rag_error:
                    logger.warning(f"Error using RAG system: {rag_error}")

            # Use RAG and LLM to simulate device search
            prompt = ChatPromptTemplate.from_template("""<|im_start|>system
You are a network topology expert who can search and find devices in a network.
<|im_end|>
<|im_start|>user
Search for network devices matching these criteria:
{criteria}

{context}

Generate a list of found devices with these EXACT properties (use exactly these field names, all lowercase):
- ci_id: Device identifier (use format like "R001" for routers, "S001" for switches, "FW001" for firewalls)
- name: Descriptive name
- ci_type: Device type (router, switch, firewall, etc.) - MUST BE ALL LOWERCASE
- status: Current status (active, inactive, warning, etc.)
- location: Physical location
- importance: Importance in the network (use values like "high", "medium", "low", "critical")

Respond ONLY with a JSON list of devices that match the criteria. Use EXACTLY the field names shown above.
<|im_end|>
<|im_start|>assistant
""")

            # Format the criteria for the prompt
            criteria_str = json.dumps(state.search_criteria, indent=2)

            # Handle format string issues
            safe_criteria_str = criteria_str.replace("{", "{{").replace("}", "}}")

            # Generate device list
            try:
                formatted_prompt = prompt.format(criteria=safe_criteria_str, context=context)
                response = self.llm.invoke(formatted_prompt)
            except ValueError as ve:
                logger.warning(f"Format string error: {ve}, using alternative formatting")
                template = prompt.template
                safe_template = template.replace("{criteria}", criteria_str).replace("{context}",
                                          "Use your knowledge of network devices to find matching devices.")
                response = self.llm.invoke(safe_template)

            # Use JSON extraction and parsing
            devices = self._extract_and_fix_json(response, query_text)

            # Log received devices for debugging
            logger.info(f"Devices before standardization: {devices}")

            # Case-insensitive field standardization
            validated_devices = []
            for device in devices:
                # Create a lowercase keys dictionary for case-insensitive lookup
                lower_device = {k.lower(): v for k, v in device.items()}

                # Now extract fields with fallbacks using the lowercase dictionary
                standardized_device = {
                    "ci_id": None,
                    "name": None,
                    "ci_type": None,
                    "status": None,
                    "location": None,
                    "importance": None
                }

                # ci_id field
                if "ci_id" in lower_device:
                    standardized_device["ci_id"] = lower_device["ci_id"]
                elif "id" in lower_device:
                    standardized_device["ci_id"] = lower_device["id"]
                else:
                    # Determine ID based on type if possible
                    if "firewall" in query_text.lower():
                        standardized_device["ci_id"] = "FW001"
                    elif "router" in query_text.lower():
                        standardized_device["ci_id"] = f"R{len(validated_devices)+1:03d}"
                    elif "switch" in query_text.lower():
                        standardized_device["ci_id"] = f"S{len(validated_devices)+1:03d}"
                    else:
                        standardized_device["ci_id"] = f"DEV{len(validated_devices)+1:03d}"

                # name field
                if "name" in lower_device:
                    standardized_device["name"] = lower_device["name"]
                elif "description" in lower_device:
                    standardized_device["name"] = lower_device["description"]
                else:
                    device_type = "Device"
                    if "ci_type" in lower_device:
                        device_type = lower_device["ci_type"]
                    elif "type" in lower_device:
                        device_type = lower_device["type"]
                    standardized_device["name"] = f"{device_type} {len(validated_devices)+1}"

                # ci_type field - Standardization for Gradio display
                if "ci_type" in lower_device:
                    standardized_device["ci_type"] = str(lower_device["ci_type"]).lower()
                elif "type" in lower_device:
                    standardized_device["ci_type"] = str(lower_device["type"]).lower()
                else:
                    # Determine type from ID if possible
                    if standardized_device["ci_id"].startswith("R"):
                        standardized_device["ci_type"] = "router"
                    elif standardized_device["ci_id"].startswith("S"):
                        standardized_device["ci_type"] = "switch"
                    elif standardized_device["ci_id"].startswith("FW"):
                        standardized_device["ci_type"] = "firewall"
                    else:
                        standardized_device["ci_type"] = "unknown"

                # status field
                if "status" in lower_device:
                    standardized_device["status"] = str(lower_device["status"])
                elif "state" in lower_device:
                    standardized_device["status"] = str(lower_device["state"])
                else:
                    standardized_device["status"] = "active"

                # location field
                if "location" in lower_device:
                    standardized_device["location"] = str(lower_device["location"])
                elif "site" in lower_device:
                    standardized_device["location"] = str(lower_device["site"])
                else:
                    standardized_device["location"] = "Unknown"

                # importance field
                if "importance" in lower_device:
                    standardized_device["importance"] = str(lower_device["importance"]).lower()
                elif "criticality" in lower_device:
                    standardized_device["importance"] = str(lower_device["criticality"]).lower()
                elif "priority" in lower_device:
                    standardized_device["importance"] = str(lower_device["priority"]).lower()
                else:
                    standardized_device["importance"] = "medium"

                # Make sure all values are strings to avoid error
                for key, value in standardized_device.items():
                    if value is None:
                        if key == "ci_id":
                            standardized_device[key] = f"DEV{len(validated_devices)+1:03d}"
                        elif key == "name":
                            standardized_device[key] = "Unknown Device"
                        elif key == "ci_type":
                            standardized_device[key] = "unknown"
                        elif key == "status":
                            standardized_device[key] = "active"
                        elif key == "location":
                            standardized_device[key] = "Unknown"
                        elif key == "importance":
                            standardized_device[key] = "medium"
                    elif not isinstance(value, str):
                        standardized_device[key] = str(value)

                logger.info(f"Standardized device: {standardized_device}")
                validated_devices.append(standardized_device)

            new_state.found_devices = validated_devices
            logger.info(f"Successfully processed {len(validated_devices)} standardized devices")

            return new_state

        except Exception as e:
            logger.error(f"Error in search_devices: {e}")
            new_state.error = f"Error searching devices: {str(e)}"
            return new_state

    def _extract_and_fix_json(self, response, query_text=""):
        """Extract and fix JSON from LLM response with robust error handling"""
        import re
        import json

        # Try multiple approaches to extract valid JSON
        json_str = None

        # First try: Look for JSON code block
        json_match = re.search(r'```(?:json)?\s*\n([\s\S]*?)\n```', response, re.DOTALL)
        if json_match:
            json_str = json_match.group(1).strip()
        else:
            # Second try: Look for array pattern
            json_match = re.search(r'(\[\s*\{.*\}\s*\])', response, re.DOTALL)
            if json_match:
                json_str = json_match.group(1).strip()
            else:
                # Third try: Scan for first [ to last ]
                start = response.find('[')
                end = response.rfind(']')
                if start != -1 and end != -1 and end > start:
                    json_str = response[start:end+1].strip()
                else:
                    # Last resort: Check if the entire response might be JSON with some prefix
                    clean_response = response.strip()
                    start = clean_response.find('[')
                    if start != -1:
                        json_str = clean_response[start:].strip()

        # If JSON extraction was not possible, use fallback
        if not json_str:
            logger.warning("Couldn't extract JSON from response, using fallback")
            return self._get_fallback_devices(query_text)

        logger.info(f"Extracted JSON string: {json_str[:100]}...")

        # Cleanup the JSON string before parsing
        try:
            # Remove control characters
            json_str = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', json_str)

            # Fix common JSON formatting issues
            json_str = re.sub(r',\s*\}', '}', json_str)  # Fix trailing commas in objects
            json_str = re.sub(r',\s*\]', ']', json_str)  # Fix trailing commas in arrays
            json_str = re.sub(r'([{,]\s*)(\w+)(\s*:)', r'\1"\2"\3', json_str)  # Quote property names

            # Try to parse the JSON with various fixes
            try:
                parsed_data = json.loads(json_str)

                # Check for field/value format (common in some LLM outputs)
                if isinstance(parsed_data, list) and len(parsed_data) > 0 and "field" in parsed_data[0] and "value" in parsed_data[0]:
                    logger.info("Detected field/value format, converting to standard format")

                    # Convert field/value format to standard device object
                    device_fields = {}
                    for item in parsed_data:
                        field = item.get("field", "").lower()
                        value = item.get("value", "")

                        # Map fields to standard names
                        if field == "description" or field == "name":
                            device_fields["name"] = value
                        elif field == "ci_id" or field == "id" or field == "device id":
                            device_fields["ci_id"] = value
                        elif field == "ci_type" or field == "type" or field == "device type":
                            device_fields["ci_type"] = value
                        elif field == "status":
                            device_fields["status"] = value
                        elif field == "location":
                            device_fields["location"] = value
                        elif field == "importance":
                            device_fields["importance"] = value

                    # Create a proper device object
                    if device_fields:
                        # Create a device using the extracted fields
                        # Use "FW001" as default ci_id if query contains "firewall"
                        default_ci_id = "FW001" if "firewall" in query_text.lower() else "DEV001"

                        device = {
                            "ci_id": device_fields.get("ci_id", default_ci_id),
                            "name": device_fields.get("name", "Unknown Device"),
                            "ci_type": device_fields.get("ci_type", "unknown"),
                            "status": device_fields.get("status", "active"),
                            "location": device_fields.get("location", "Unknown"),
                            "importance": device_fields.get("importance", "medium")
                        }
                        return [device]
                    else:
                        # If we couldn't extract fields, use fallback
                        return self._get_fallback_devices(query_text)

                # Handle other list formats that may not be device objects
                if isinstance(parsed_data, list) and len(parsed_data) > 0:
                    # Check if these are proper device objects
                    if all("name" in item or "ci_id" in item for item in parsed_data):
                        return parsed_data
                    else:
                        # Not device objects, use fallback
                        logger.warning("Parsed JSON doesn't contain proper device objects")
                        return self._get_fallback_devices(query_text)

                # If not a list, try to make it one
                if not isinstance(parsed_data, list):
                    if isinstance(parsed_data, dict):
                        return [parsed_data]
                    else:
                        return self._get_fallback_devices(query_text)

                return parsed_data

            except json.JSONDecodeError as e:
                logger.warning(f"Initial JSON parse failed: {e}, attempting fixes")

                # Try with single quote replacement
                try:
                    json_str = json_str.replace("'", '"')
                    parsed_data = json.loads(json_str)
                    return parsed_data
                except json.JSONDecodeError as e2:
                    logger.warning(f"Single quote fix failed: {e2}, attempting line-by-line parsing")

                    # Try to parse the JSON line by line
                    try:
                        device_matches = re.finditer(r'\{[^{}]*\}', json_str)
                        devices = []

                        for match in device_matches:
                            device_str = match.group(0)
                            try:
                                device_str = device_str.replace("'", '"')
                                device_str = re.sub(r'([{,]\s*)(\w+)(\s*:)', r'\1"\2"\3', device_str)
                                device_obj = json.loads(device_str)
                                devices.append(device_obj)
                            except json.JSONDecodeError:
                                logger.warning(f"Skipping invalid device object: {device_str[:50]}...")

                        if devices:
                            logger.info(f"Successfully parsed {len(devices)} devices from line-by-line approach")
                            return devices
                        else:
                            logger.warning("Line-by-line parsing yielded no valid devices")
                    except Exception as e3:
                        logger.warning(f"Line-by-line parsing failed: {e3}")

                    # Manual extraction with regex - as last resort solution
                    try:
                        ci_ids = re.findall(r'["\']ci_id["\']\s*:\s*["\']([^"\']+)["\']', json_str)
                        names = re.findall(r'["\']name["\']\s*:\s*["\']([^"\']+)["\']', json_str)
                        types = re.findall(r'["\'](?:ci_type|type)["\']\s*:\s*["\']([^"\']+)["\']', json_str)

                        if ci_ids or names:
                            devices = []
                            # Use the longest list as our base
                            max_length = max(len(ci_ids), len(names), len(types))

                            for i in range(max_length):
                                device = {
                                    "ci_id": ci_ids[i] if i < len(ci_ids) else f"DEV{i+1:03d}",
                                    "name": names[i] if i < len(names) else f"Device {i+1}",
                                    "ci_type": types[i] if i < len(types) else "unknown",
                                    "status": "active",
                                    "importance": "medium",
                                    "location": "Unknown"
                                }
                                devices.append(device)

                            logger.info(f"Manually constructed {len(devices)} devices from regex extraction")
                            return devices
                    except Exception as e4:
                        logger.warning(f"Manual extraction failed: {e4}")

            # If all parsing attempts failed, use fallback
            logger.warning("All JSON parsing attempts failed, using fallback devices")
            return self._get_fallback_devices(query_text)

        except Exception as e:
            logger.error(f"Error cleaning up JSON: {e}")
            return self._get_fallback_devices(query_text)

    def _get_fallback_devices(self, query_text):
        """Generate fallback devices based on query text"""
        devices = []

        # Check for common device types in the query
        if "router" in query_text.lower():
            devices.append({
                "ci_id": "R001",
                "name": "Core Router 1",
                "ci_type": "router",
                "status": "active",
                "location": "Data Center",
                "importance": "critical"
            })

            if "all" in query_text.lower():
                devices.append({
                    "ci_id": "R002",
                    "name": "Core Router 2",
                    "ci_type": "router",
                    "status": "active",
                    "location": "Backup Data Center",
                    "importance": "critical"
                })

        elif "switch" in query_text.lower():
            devices.append({
                "ci_id": "S001",
                "name": "Distribution Switch 1",
                "ci_type": "switch",
                "status": "active",
                "location": "Main Office",
                "importance": "high"
            })

            if "all" in query_text.lower():
                devices.append({
                    "ci_id": "S002",
                    "name": "Access Switch 1",
                    "ci_type": "switch",
                    "status": "active",
                    "location": "Branch Office",
                    "importance": "medium"
                })

        elif "firewall" in query_text.lower():
            devices.append({
                "ci_id": "FW001",
                "name": "Edge Firewall",
                "ci_type": "firewall",
                "status": "active",
                "location": "Data Center",
                "importance": "critical"
            })

        # If nothing matched or we need more devices, add a generic one
        if not devices or "all" in query_text.lower():
            devices.append({
                "ci_id": "DEV001",
                "name": "Generic Network Device",
                "ci_type": "unknown",
                "status": "active",
                "location": "Network Core",
                "importance": "medium"
            })

        return devices

    def analyze_topology(self, state: DeviceSearchState) -> DeviceSearchState:
        """Analyze upstream and downstream devices"""
        logger.info("Analyzing topology")
        new_state = deepcopy(state)

        try:
            # Process each found device
            for device in new_state.found_devices:
                device_id = device.get("ci_id")

                # Use RAG and LLM to simulate topology analysis
                if device_id:
                    # Generate upstream devices
                    upstream = self._generate_connected_devices(device, "upstream")
                    new_state.upstream_devices[device_id] = upstream

                    # Generate downstream devices
                    downstream = self._generate_connected_devices(device, "downstream")
                    new_state.downstream_devices[device_id] = downstream

                    # Generate affected services
                    services = self._generate_affected_services(device)
                    new_state.affected_services[device_id] = services

            return new_state

        except Exception as e:
            logger.error(f"Error in analyze_topology: {e}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            new_state.error = f"Error analyzing topology: {str(e)}"
            return new_state

    def _generate_connected_devices(self, device, direction):
        """Generate connected devices with consistent field names"""
        # Get standardized device information
        device_type = device.get("ci_type", "").lower()
        device_id = device.get("ci_id", "")
        device_name = device.get("name", "unknown-device")

        result = []
        try:
            # Generate different connections based on device type
            if device_type == "router":
                if direction == "upstream":
                    # Router upstream connections
                    numeric_part = 1
                    if device_id.startswith('R') and device_id[1:].isdigit():
                        numeric_part = int(device_id[1:])

                    upstream_id = f"R{numeric_part - 1:03d}" if numeric_part > 1 else "WAN001"
                    upstream_name = f"core-router-{numeric_part - 1:02d}" if numeric_part > 1 else "wan-edge-01"

                    result = [{
                        "ci_id": upstream_id,
                        "name": upstream_name,
                        "ci_type": "router",
                        "status": "active",
                        "location": "Network Core",
                        "importance": "critical"
                    }]
                else:
                    # Router downstream connections
                    switch_suffix = device_id.replace("R", "").replace("-", "")[:3]
                    if not switch_suffix.isalnum():
                        switch_suffix = "001"

                    result = [
                        {
                            "ci_id": f"S{switch_suffix}A",
                            "name": f"distribution-switch-{device_name[:3]}a",
                            "ci_type": "switch",
                            "status": "active",
                            "location": device.get("location", "Unknown"),
                            "importance": "high"
                        },
                        {
                            "ci_id": f"S{switch_suffix}B",
                            "name": f"distribution-switch-{device_name[:3]}b",
                            "ci_type": "switch",
                            "status": "active",
                            "location": device.get("location", "Unknown"),
                            "importance": "high"
                        }
                    ]
            elif device_type == "switch":
                if direction == "upstream":
                    # Switches connect upstream to routers or other switches
                    if "access" in str(device_name).lower():
                        # Create a descriptive ID
                        dist_id = device_id.replace("S", "").split("1")[0]
                        if not dist_id:
                            dist_id = "01"

                        result = [
                            {
                                "ci_id": f"S{dist_id}0",
                                "name": f"distribution-switch-{dist_id}",
                                "ci_type": "switch",
                                "status": "active",
                                "location": device.get("location", "Unknown"),
                                "importance": "high"
                            }
                        ]
                    else:
                        # Create a sensible router ID
                        router_suffix = device_id.replace("S", "").replace("-", "")[:2]
                        if not router_suffix.isalnum():
                            router_suffix = "01"  # Fallback suffix

                        result = [
                            {
                                "ci_id": f"R{router_suffix}",
                                "name": f"core-router-{router_suffix}",
                                "ci_type": "router",
                                "status": "active",
                                "location": device.get("location", "Unknown"),
                                "importance": "critical"
                            }
                        ]
                else:
                    # Downstream devices
                    device_suffix = device_id.replace("S", "").replace("-", "")[:3]
                    if not device_suffix.isalnum():
                        device_suffix = "001"  # Fallback

                    if "access" in str(device_name).lower():
                        result = [
                            {
                                "ci_id": f"SRV{device_suffix}A",
                                "name": f"server-{device_suffix}-rack-a",
                                "ci_type": "server",
                                "status": "active",
                                "location": device.get("location", "Unknown"),
                                "importance": "high"
                            },
                            {
                                "ci_id": f"SRV{device_suffix}B",
                                "name": f"server-{device_suffix}-rack-b",
                                "ci_type": "server",
                                "status": "active",
                                "location": device.get("location", "Unknown"),
                                "importance": "high"
                            }
                        ]
                    else:
                        # Distribution switches connect to access switches
                        result = [
                            {
                                "ci_id": f"S{device_suffix}1",
                                "name": f"access-switch-{device_suffix}-1",
                                "ci_type": "switch",
                                "status": "active",
                                "location": device.get("location", "Unknown"),
                                "importance": "medium"
                            },
                            {
                                "ci_id": f"S{device_suffix}2",
                                "name": f"access-switch-{device_suffix}-2",
                                "ci_type": "switch",
                                "status": "active",
                                "location": device.get("location", "Unknown"),
                                "importance": "medium"
                            }
                        ]
            elif device_type == "firewall":
                if direction == "upstream":
                    # Firewall connects upstream to router
                    result = [
                        {
                            "ci_id": "R001",
                            "name": "core-router-01",
                            "ci_type": "router",
                            "status": "active",
                            "location": "Network Core",
                            "importance": "critical"
                        }
                    ]
                else:
                  # Firewall connects downstream to DMZ and internal networks
                    result = [
                        {
                            "ci_id": "DMZ001",
                            "name": "dmz-switch-01",
                            "ci_type": "switch",
                            "status": "active",
                            "location": "DMZ",
                            "importance": "high"
                        },
                        {
                            "ci_id": "S001",
                            "name": "internal-switch-01",
                            "ci_type": "switch",
                            "status": "active",
                            "location": "Internal Network",
                            "importance": "high"
                        }
                    ]

            # If could not generate based on type, provide a  fallback
            if not result:
                hash_val = abs(hash(str(device_id))) % 1000  # Ensure positive hash value
                result = [
                    {
                        "ci_id": f"DEV{hash_val:03d}",
                        "name": f"connected-device-{direction}",
                        "ci_type": "generic",
                        "status": "active",
                        "location": "Unknown",
                        "importance": "medium"
                    }
                ]

        except Exception as e:
            # Log the error but don't break the workflow
            logger.warning(f"Error generating connected devices for {device_id}: {e}")
            # Return a generic device as fallback
            hash_val = abs(hash(str(device_id))) % 1000  # Ensure positive hash value
            result = [
                {
                    "ci_id": f"DEV{hash_val:03d}",
                    "name": f"connected-device-{direction}",
                    "ci_type": "generic",
                    "status": "active",
                    "location": "Unknown",
                    "importance": "medium"
                }
            ]

        return result

    def _generate_affected_services(self, device):
        """Generate affected services for a device with improved type handling"""
        device_type = device.get("ci_type", "")

        # Fix for the error: Check type before calling .lower()
        importance_value = device.get("importance", "")
        # Convert to string if it's not already
        if not isinstance(importance_value, str):
            importance_value = str(importance_value)
        importance = importance_value.lower()

        # Generate services based on device type and importance
        services = []

        if importance in ["critical", "high"] or "router" in str(device_type).lower():
            services.append({
                "service_id": "SVC001",
                "name": "Customer Portal",
                "status": "active",
                "criticality": "high"
            })

        if "router" in str(device_type).lower() or "firewall" in str(device_type).lower():
            services.append({
                "service_id": "SVC002",
                "name": "VPN Access",
                "status": "active",
                "criticality": "medium"
            })

        if "switch" in str(device_type).lower() and "distribution" in str(device.get("name", "")).lower():
            services.append({
                "service_id": "SVC003",
                "name": "Internal Applications",
                "status": "active",
                "criticality": "medium"
            })

        if "switch" in str(device_type).lower() and "access" in str(device.get("name", "")).lower():
            services.append({
                "service_id": "SVC004",
                "name": "Office Network",
                "status": "active",
                "criticality": "low"
            })

        if "firewall" in str(device_type).lower():
            services.append({
                "service_id": "SVC005",
                "name": "Security Services",
                "status": "active",
                "criticality": "critical"
            })

        return services

    def format_results(self, state: DeviceSearchState) -> DeviceSearchState:
        """Format the results for the response"""
        logger.info("Formatting results")
        return state

    def __call__(self, input_data, mcp_context=None):
        """Process a device search request with improved result handling"""
        logger.info(f"Device search request: {input_data}")

        try:
            # Handle different input formats
            if isinstance(input_data, str):
                # Direct string input
                query = input_data
                logger.info(f"Direct string query: {query}")
            elif isinstance(input_data, dict) and "query" in input_data:
                # Dictionary with query key
                query = input_data.get("query")
                logger.info(f"Query from dict: {query}")
            else:
                # Fallback
                query = input_data
                logger.info(f"Using input directly as query: {type(query)}")

            # Create initial state - DeviceSearchState will handle conversion
            initial_state = DeviceSearchState(query=query)

            # Run the graph
            result = self.graph.invoke(initial_state)
            logger.info(f"Graph execution result type: {type(result)}")

            # Handle different result types
            # Check if result is a dictionary-like object
            if hasattr(result, "get") and callable(result.get):
                # It's a dictionary-like object (AddableValuesDict)
                logger.info("Processing result as dictionary-like object")
                error = result.get("error")
                formatted_result = {
                    "success": error is None,
                    "found_devices": result.get("found_devices", []),
                    "upstream_devices": result.get("upstream_devices", {}),
                    "downstream_devices": result.get("downstream_devices", {}),
                    "affected_services": result.get("affected_services", {}),
                }
                if error:
                    formatted_result["error"] = error
            else:
                # It should be a DeviceSearchState object
                logger.info("Processing result as DeviceSearchState object")
                formatted_result = {
                    "success": not hasattr(result, "error") or result.error is None,
                    "found_devices": getattr(result, "found_devices", []),
                    "upstream_devices": getattr(result, "upstream_devices", {}),
                    "downstream_devices": getattr(result, "downstream_devices", {}),
                    "affected_services": getattr(result, "affected_services", {}),
                }
                if hasattr(result, "error") and result.error:
                    formatted_result["error"] = result.error

            return formatted_result

        except Exception as e:
            logger.error(f"Error in device search agent: {str(e)}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            return {
                "success": False,
                "error": f"Error processing query: {str(e)}",
                "found_devices": []
            }


Writing langgraph_device_search_agent.py


### Creating an Agentic RAG System - Specialized Agents Accomplish Tasks based on their domain expertise/definition

In [7]:
%%writefile rag_system_updated.py
import os
import logging
import re
import json
from pathlib import Path
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain_community.llms import HuggingFacePipeline
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

# Configure logging with more detail
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("rag_system")

# Define the Path for Knowledge Base for each of the defined agents for Embeddings , Vector store and Retrieval
class NetworkRAGSystem:
    def __init__(self, knowledge_base_path="/content/agentic_rag-mcp_system/network_knowledge_base.txt"):
        self.doc_collections = {}
        self.vector_stores = {}
        self.llm = None
        self.initialized = False
        self.knowledge_base_path = knowledge_base_path

    def initialize(self):
        """Initialize the RAG system with better error handling"""
        if self.initialized:
            return True

        logger.info("Initializing RAG system...")

        # Test section mapping
        test_sections = ["TROUBLESHOOTING", "OBSERVABILITY", "DEVICE SEARCH", "KNOWLEDGE BASE", "INCIDENT", "DEVICE INVENTORY"]
        for section in test_sections:
            mapped = self._map_section_to_agent(section)
            logger.info(f"Test mapping: '{section}' → '{mapped}'")

        # Sequential initialization with verification
        if not self._load_documents():
            logger.error("Document loading failed, initialization incomplete")
            return False

        if not self._create_vector_stores():
            logger.error("Vector store creation failed, initialization incomplete")
            return False

        if not self._init_llm():
            logger.error("LLM initialization failed, initialization incomplete")
            return False

        self.initialized = True
        logger.info("RAG system successfully initialized")
        return True

    def _load_documents(self):
        """Load documents from file with enhanced validation and error reporting"""
        logger.info(f"Loading documents from {self.knowledge_base_path}...")

        # Verify file exists
        kb_path = Path(self.knowledge_base_path)
        if not kb_path.exists():
            logger.error(f"Knowledge base file not found at {kb_path.absolute()}")
            return False

        try:
            with open(kb_path, "r", encoding="utf-8") as f:
                content = f.read()

            if not content:
                logger.error("Knowledge base file is empty")
                return False

            logger.info(f"Successfully read knowledge base file, size: {len(content)} bytes")

            # Print raw section names for debugging
            section_matches = re.findall(r'# (\w+(?:\s+\w+)*) DOCUMENTS #', content)
            logger.info(f"Found raw section names: {section_matches}")

            # Split content into document collections by section
            section_pattern = r'# (\w+(?:\s+\w+)*) DOCUMENTS #'
            sections = re.split(section_pattern, content)

            if len(sections) <= 1:
                logger.error("Could not parse any sections from knowledge base")
                return False

            logger.info(f"Found {(len(sections)-1)//2} sections in knowledge base")

            # Process each section (odd indices are section names, even indices are content)
            for i in range(1, len(sections), 2):
                if i+1 < len(sections):
                    section_name = sections[i].strip()
                    section_content = sections[i+1].strip()
                    logger.info(f"Processing section: {section_name}, content length: {len(section_content)}")

                    # Extract documents (lines starting with "DOCUMENT X:")
                    docs = []
                    for doc in re.split(r'DOCUMENT \w+:', section_content):
                        if doc.strip():
                            docs.append(doc.strip())

                    if not docs:
                        logger.warning(f"No documents extracted from section {section_name}")
                        continue

                    # Map section names to agent types
                    agent_type = self._map_section_to_agent(section_name)
                    logger.info(f"Mapped section '{section_name}' to agent type '{agent_type}'")

                    # Store in document collections
                    self.doc_collections[agent_type] = docs
                    logger.info(f"Loaded {len(docs)} documents for {agent_type} agent")

                    # Print first few chars of first doc for verification
                    if docs:
                        logger.info(f"Sample doc for {agent_type}: {docs[0][:100]}...")

            # Verify -  loaded at least some documents
            if not self.doc_collections:
                logger.error("No documents were successfully loaded")
                return False

            logger.info(f"Successfully loaded documents for {list(self.doc_collections.keys())}")
            return True

        except Exception as e:
            logger.error(f"Error loading documents: {str(e)}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            return False

    def _map_section_to_agent(self, section_name):
        """Map section names to agent types with improved matching"""
        # Normalize section name
        section_name = section_name.lower().strip()

        # Direct mappings for commonly used section names
        direct_mappings = {
            "device inventory": "device_inventory",
            "device search": "device_search",
            "troubleshooting": "troubleshooting",
            "observability": "observability",
            "knowledge base": "knowledge_base",
            "incident resolution": "incident_resolution",
            "incident": "incident_resolution"
        }

        # Check for exact match first
        if section_name in direct_mappings:
            return direct_mappings[section_name]

        # Keyword-based mapping as fallback
        mappings = {
            "troubleshooting": ["troubleshoot", "trouble", "issue", "problem", "error", "diagnos"],
            "observability": ["observ", "monitor", "metric", "alert", "threshold", "capac", "trend"],
            "device_search": ["device search", "topolog", "network", "infrastructure", "dependencies"],
            "device_inventory": ["inventory", "device inventory", "equipment", "assets", "ci"],
            "knowledge_base": ["knowledge", "protocol", "secur", "best", "practice", "reference", "guide"],
            "incident_resolution": ["incident", "sever", "resolut", "response", "communication"]
        }

        # Check each agent type's keyword list
        for agent_type, keywords in mappings.items():
            for keyword in keywords:
                if keyword in section_name:
                    logger.info(f"Mapped section '{section_name}' to agent '{agent_type}' via keyword '{keyword}'")
                    return agent_type

        # Default mapping
        logger.warning(f"No mapping found for section '{section_name}', using default mapping")
        if "device" in section_name:
            return "device_search"
        return "knowledge_base"  # Better default than "general"

    def _create_vector_stores(self):
        """Create vector stores with validation and diagnostics"""
        logger.info("Creating vector stores...")

        if not self.doc_collections:
            logger.error("No document collections available to index")
            return False

        try:
            # Initialize embeddings model with validation
            try:
                embeddings = HuggingFaceEmbeddings(
                    model_name="sentence-transformers/all-MiniLM-L6-v2"
                )
                # Quick test to verify embeddings work
                test_embedding = embeddings.embed_query("test")
                if not test_embedding or len(test_embedding) == 0:
                    raise ValueError("Embedding model returned empty embeddings")
                logger.info(f"Embeddings validated, dimension: {len(test_embedding)}")
            except Exception as e:
                logger.error(f"Failed to initialize embeddings model: {e}")
                return False

            # Create text splitter
            text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=500,
                chunk_overlap=50
            )

            # Process each document collection
            successful_stores = 0
            for agent_type, docs in self.doc_collections.items():
                texts = []
                metadatas = []

                # Split documents into chunks
                for i, doc in enumerate(docs):
                    chunks = text_splitter.split_text(doc)
                    logger.info(f"Split document {i+1} for {agent_type} into {len(chunks)} chunks")
                    for j, chunk in enumerate(chunks):
                        texts.append(chunk)
                        metadatas.append({
                            "source": f"Document {i+1} for {agent_type}",
                            "agent_type": agent_type,
                            "chunk_id": j
                        })

                # Create vector store if we have documents
                if texts:
                    logger.info(f"Creating vector store for {agent_type} with {len(texts)} chunks")
                    try:
                        vector_store = Chroma.from_texts(
                            texts=texts,
                            embedding=embeddings,
                            metadatas=metadatas
                        )

                        # Validate the vector store with a simple query
                        test_results = vector_store.similarity_search(f"test query for {agent_type}", k=1)
                        if len(test_results) > 0:
                            logger.info(f"Vector store for {agent_type} validated")
                            self.vector_stores[agent_type] = vector_store
                            successful_stores += 1
                        else:
                            logger.error(f"Vector store for {agent_type} failed validation check")
                    except Exception as e:
                        logger.error(f"Error creating vector store for {agent_type}: {e}")
                else:
                    logger.warning(f"No text chunks generated for {agent_type}")

            # Verify at least some vector stores were created
            if successful_stores == 0:
                logger.error("No vector stores were successfully created")
                return False

            logger.info(f"Successfully created {successful_stores} vector stores: {list(self.vector_stores.keys())}")
            return True

        except Exception as e:
            logger.error(f"Error in vector store creation: {str(e)}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            return False

    def _init_llm(self):
        """Initialize the language model with proper token limits"""
        logger.info("Loading language model...")

        try:
            model_id = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
            tokenizer = AutoTokenizer.from_pretrained(model_id)
            model = AutoModelForCausalLM.from_pretrained(
                model_id,
                torch_dtype="auto",
                device_map="auto"
            )

            # Fix: Use max_new_tokens instead of max_length
            pipe = pipeline(
                "text-generation",
                model=model,
                tokenizer=tokenizer,
                max_new_tokens=512,  # Allow generating up to 512 new tokens
                do_sample=True,      # Enable sampling
                temperature=0.7,
                top_p=0.95,
                repetition_penalty=1.1
            )

            self.llm = HuggingFacePipeline(pipeline=pipe)

            # Validate the LLM with a simple query
            test_response = self.llm.invoke("Test query to verify model is working.")
            if not test_response:
                logger.error("LLM validation failed - empty response")
                return False

            logger.info("LLM initialized and validated")
            return True

        except Exception as e:
            logger.error(f"Error initializing LLM: {str(e)}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")

            # Create a simple mock LLM for testing
            from langchain.llms.fake import FakeListLLM
            responses = [
                "This is a mock response because the real LLM could not be initialized. Please check the logs for details."
            ]
            self.llm = FakeListLLM(responses=responses)
            logger.warning("Using mock LLM due to initialization error")
            return False

    def _query_all_stores(self, query_text, k=3):
        """Query all available vector stores when the specific one isn't found"""
        logger.info(f"Performing cross-store query: {query_text}")

        all_docs = []
        all_sources = []

        # Sort stores by potential relevance (device queries should check device-related stores first)
        prioritized_stores = sorted(
            self.vector_stores.items(),
            key=lambda x: 1 if "device" in x[0] else 2  # Prioritize device-related stores
        )

        for store_name, vector_store in prioritized_stores:
            try:
                logger.info(f"Querying '{store_name}' store")
                docs = vector_store.similarity_search(query_text, k=k)
                if docs:
                    logger.info(f"Found {len(docs)} docs in '{store_name}' store")
                    all_docs.extend(docs)
                    all_sources.extend([doc.metadata.get("source", f"Unknown from {store_name}") for doc in docs])
            except Exception as e:
                logger.warning(f"Error querying {store_name} store: {e}")

        # Sort by relevance (simplistic approach)
        return all_docs[:k*2], all_sources[:k*2]  # Return more than k to allow filtering

    def query(self, agent_type, query_text, k=3, fallback_to_general=True):
        """Query the RAG system with enhanced retrieval and cross-store fallback"""
        if not self.initialized:
            logger.info("System not initialized, initializing now...")
            if not self.initialize():
                return {
                    "response": "The RAG system failed to initialize properly. Please check the logs.",
                    "sources": [],
                    "retrieved_content": []
                }

        # Add diagnostic logging
        logger.info(f"Available vector stores: {list(self.vector_stores.keys())}")
        logger.info(f"Available document collections: {list(self.doc_collections.keys())}")

        # Handle case where query_text might be a dictionary
        if isinstance(query_text, dict):
            if "description" in query_text:
                query_text = query_text["description"]
            else:
                query_text = str(query_text)

        # Preprocess query
        query_text = query_text.strip().lower()

        logger.info(f"RAG query for {agent_type}: {query_text}")

        # Get the vector store for this agent
        vector_store = self.vector_stores.get(agent_type)
        docs = []
        sources = []

        if not vector_store:
            logger.warning(f"No vector store available for {agent_type}")

            # Try cross-store searching first
            logger.info("Attempting cross-store search")
            docs, sources = self._query_all_stores(query_text, k=k)

            if not docs and fallback_to_general:
                # If cross-store search fails, try the traditional fallbacks
                if "knowledge_base" in self.vector_stores:
                    logger.info(f"Falling back to 'knowledge_base' vector store")
                    agent_type = "knowledge_base"
                    vector_store = self.vector_stores["knowledge_base"]
                elif self.vector_stores:
                    # Last resort - use any available store
                    fallback_agent = next(iter(self.vector_stores.keys()))
                    logger.info(f"Falling back to '{fallback_agent}' vector store")
                    agent_type = fallback_agent
                    vector_store = self.vector_stores[fallback_agent]
                else:
                    logger.error("No vector stores available")
                    return {
                        "response": f"I don't have specific knowledge for this query type: {agent_type}.",
                        "sources": [],
                        "retrieved_content": []
                    }

        # If get docs from cross-store search, use those directly
        if not docs and vector_store:
            try:
                # Retrieve relevant documents from the specific store
                retrieval_k = k * 2  # Retrieve more docs than needed for filtering
                logger.info(f"Performing similarity search for {agent_type}, k={retrieval_k}")
                docs = vector_store.similarity_search(query_text, k=retrieval_k)
                logger.info(f"Retrieved {len(docs)} documents")

                # Extract sources
                sources = [doc.metadata.get("source", "Unknown") for doc in docs]

            except Exception as e:
                logger.error(f"Error in similarity search: {e}")
                docs = []
                sources = []

        if not docs:
            logger.warning(f"No documents retrieved for query: {query_text}")
            return {
                "response": "I couldn't find specific information to answer your query. Please try rephrasing your question.",
                "sources": [],
                "retrieved_content": []
            }

        # Extract content
        retrieved_contents = [doc.page_content for doc in docs]

        # Filter to most relevant top k
        retrieved_contents = retrieved_contents[:k]
        sources = sources[:k]

        # Log sources for debugging
        for i, (src, content) in enumerate(zip(sources, retrieved_contents)):
            logger.info(f"Retrieved doc {i+1}: {src}")
            logger.info(f"Content preview: {content[:100]}...")

        # Format retrieved content
        context = "\n\n".join([f"Content from {src}:\n{content}" for src, content in zip(sources, retrieved_contents)])

        # Create agent-specific prompts
        if agent_type == "troubleshooting":
            system_prompt = "You are a network troubleshooting expert. Use the retrieved information to diagnose the problem."
            prompt_template = """<|im_start|>system
{system_prompt}
<|im_end|>
<|im_start|>user
I need help troubleshooting this network issue:
{query}

Here is relevant information from our knowledge base:
{context}

Based on this information, please provide:
1. An analysis of the likely root cause
2. Possible contributing factors
3. Recommended troubleshooting steps
4. Priority assessment
<|im_end|>
<|im_start|>assistant
"""
        elif agent_type == "device_search" or agent_type == "device_inventory":
            system_prompt = "You are a network topology expert. Help identify device information and relationships."
            prompt_template = """<|im_start|>system
{system_prompt}
<|im_end|>
<|im_start|>user
I need information about network devices:
{query}

Here is relevant information about our network devices:
{context}

Based on this information, please provide:
1. Detailed device information that matches the query
2. The importance of these devices in the network
3. Any relationships or dependencies with other devices
<|im_end|>
<|im_start|>assistant
"""
        else:
            # Generic prompt for other agent types
            system_prompt = "You are a network expert. Provide information based on the retrieved knowledge."
            prompt_template = """<|im_start|>system
{system_prompt}
<|im_end|>
<|im_start|>user
{query}

Here is relevant information:
{context}

Please provide a helpful response based on this information.
<|im_end|>
<|im_start|>assistant
"""

        # Format the prompt
        prompt = ChatPromptTemplate.from_template(prompt_template)
        formatted_prompt = prompt.format(
            system_prompt=system_prompt,
            query=query_text,
            context=context
        )

        # Generate response
        try:
            response = self.llm.invoke(formatted_prompt)

            # Return results
            return {
                "response": response,
                "sources": sources,
                "retrieved_content": retrieved_contents
            }
        except Exception as e:
            logger.error(f"Error generating response: {e}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")

            return {
                "response": f"I retrieved relevant information but encountered an error when generating a response: {str(e)}",
                "sources": sources,
                "retrieved_content": retrieved_contents
            }

# Function used in gradio_app.py to initialize the LLM for LangGraph
def init_llm_for_langgraph():
    """Loading TinyLlama model for LangGraph with proper token limits"""
    logger.info("Loading TinyLlama model for LangGraph...")

    try:
        model_id = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
        tokenizer = AutoTokenizer.from_pretrained(model_id)
        model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype="auto",
            device_map="auto"
        )

        # Use max_new_tokens instead of max_length to avoid error while processing
        pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            max_new_tokens=512,  # Allow generating up to 512 new tokens
            temperature=0.7,
            top_p=0.95,
            repetition_penalty=1.1
        )

        return HuggingFacePipeline(pipeline=pipe)
    except Exception as e:
        logger.error(f"Error initializing LLM: {e}")

        # Create a simple mock LLM for testing
        from langchain.llms.fake import FakeListLLM
        responses = [
            "This is a mock response for testing purposes. The real LLM could not be initialized."
        ]
        logger.warning("Using mock LLM due to initialization error")
        return FakeListLLM(responses=responses)


Writing rag_system_updated.py


### Creating The AI Chatbot
- Our UI is developed using Gradio for interation with Data through MCP -> LLM ->RAG/LangGraph

In [8]:
%%writefile update_gradio_app.py
import gradio as gr
import json
import logging
import re
from rag_system_updated import NetworkRAGSystem
from full_mcp_implementation import create_mcp_system, AgentContext
from langgraph_device_search_agent import LangGraphDeviceSearchAgent
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from langchain_community.llms import HuggingFacePipeline

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("gradio_app")

# Initialize the RAG system with correct path
rag_system = NetworkRAGSystem(knowledge_base_path="/content/agentic_rag-mcp_system/network_knowledge_base.txt")

# Initialize the LLM for LangGraph
def init_llm_for_langgraph():
    logger.info("Loading TinyLlama model for LangGraph...")

    try:
        model_id = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
        tokenizer = AutoTokenizer.from_pretrained(model_id)
        model = AutoModelForCausalLM.from_pretrained(
            model_id,
            torch_dtype="auto",
            device_map="auto"
        )

        # Use max_new_tokens instead of max_length
        pipe = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            max_new_tokens=512,  # Allow generating up to 512 new tokens
            do_sample=True,      # Enable sampling
            temperature=0.7,
            top_p=0.95,
            repetition_penalty=1.1
        )

        return HuggingFacePipeline(pipeline=pipe)
    except Exception as e:
        logger.error(f"Error initializing LLM: {e}")

        # Create a simple mock LLM for testing
        from langchain.llms.fake import FakeListLLM
        responses = [
            "This is a mock response for testing purposes. The real LLM could not be initialized."
        ]
        logger.warning("Using mock LLM due to initialization error")
        return FakeListLLM(responses=responses)

# Initialize LangGraph Device Search Agent
llm = init_llm_for_langgraph()
device_search_agent = LangGraphDeviceSearchAgent(llm=llm, rag_system=rag_system)

# Initialize the MCP system with LangGraph agent
mcp_registry = create_mcp_system(rag_system, device_search_agent)

# Create a shared context that persists across interactions
shared_context = AgentContext()

def initialize_system():
    """Initialize the RAG system when the app loads"""
    logger.info("Initializing system...")
    success = rag_system.initialize()
    if success:
        logger.info("System initialized successfully")
    else:
        logger.error("System initialization failed")
    return success

def format_sources(sources):
    """Format source information for display"""
    if not sources:
        return "No sources used"

    return "Sources:\n" + "\n".join([f"- {source}" for source in sources])

# Defining Interfaces for each Agent on Gradio

def device_search_interface(query):
    """Handle device search queries using MCP with consistent field handling"""
    logger.info(f"Processing device search query: {query}")

    try:
        # Process with MCP system, specifying the agent type explicitly
        result = mcp_registry.process_query(query, agent_type="device_search", context=shared_context)

        # Debug: Log the exact device structure we're receiving
        logger.info(f"Device search result structure: {list(result.keys())}")

        if "error" in result:
            error_msg = result.get("error", "Unknown error")
            return f"## Device Search Results\nThe search could not be completed: {error_msg}\n\nNo sources used."

        # Format the found devices
        output = "## Device Search Results\n\n"

        # Determine which format we're dealing with and extract devices
        devices = []
        if "devices" in result:
            devices = result.get("devices", [])
        elif "found_devices" in result:
            devices = result.get("found_devices", [])

        if devices:
            output += f"### Found {len(devices)} Devices\n\n"
            for device in devices:
                # Normalize keys to lowercase for consistent access
                device_norm = {k.lower(): v for k, v in device.items()}

                # Extract fields with fallbacks
                device_name = device_norm.get('name', device_norm.get('description', 'Unknown Device'))
                device_type = device_norm.get('ci_type', device_norm.get('type', 'Unknown'))
                device_id = device_norm.get('ci_id', device_norm.get('id', 'Unknown'))
                device_status = device_norm.get('status', device_norm.get('state', 'Unknown'))
                device_location = device_norm.get('location', device_norm.get('site', ''))
                device_importance = device_norm.get('importance', device_norm.get('criticality', ''))

                # Format device information
                output += f"- **{device_name}** ({device_type})\n"
                output += f"  - ID: {device_id}\n"
                output += f"  - Status: {device_status}\n"
                if device_location:
                    output += f"  - Location: {device_location}\n"
                if device_importance:
                    output += f"  - Importance: {device_importance}\n"
                output += "\n"
        else:
            output += "No devices found matching your criteria.\n\n"

        # Add topology information from result
        # First check for MCP native format
        if "topology_analysis" in result:
            topology = result.get("topology_analysis", {})
            connections = topology.get("connections", [])
            if connections:
                output += "### Device Connections\n\n"
                for conn in connections:
                    output += f"- {conn.get('from')} → {conn.get('to')} ({conn.get('type', 'connection')})\n"
                output += "\n"

        # Then check for LangGraph format
        upstream_devices = result.get("upstream_devices", {})
        if upstream_devices:
            output += "### Upstream Connections\n\n"
            for device_id, upstream_list in upstream_devices.items():
                if upstream_list:
                    # Find the device name with case-insensitive search
                    device_name = device_id
                    for d in devices:
                        d_lower = {k.lower(): v for k, v in d.items()}
                        if d_lower.get('ci_id', '').lower() == device_id.lower():
                            device_name = d_lower.get('name', device_id)
                            break

                    output += f"**{device_name}** connects to:\n"
                    for upstream in upstream_list:
                        # Case-insensitive lookup for upstream devices
                        u_lower = {k.lower(): v for k, v in upstream.items()}
                        up_name = u_lower.get('name', 'Unknown')
                        up_type = u_lower.get('ci_type', u_lower.get('type', 'Unknown'))
                        output += f"- {up_name} ({up_type})\n"
                    output += "\n"

        downstream_devices = result.get("downstream_devices", {})
        if downstream_devices:
            output += "### Downstream Connections\n\n"
            for device_id, downstream_list in downstream_devices.items():
                if downstream_list:
                    # Find the device name with case-insensitive search
                    device_name = device_id
                    for d in devices:
                        d_lower = {k.lower(): v for k, v in d.items()}
                        if d_lower.get('ci_id', '').lower() == device_id.lower():
                            device_name = d_lower.get('name', device_id)
                            break

                    output += f"**{device_name}** connects to:\n"
                    for downstream in downstream_list:
                        # Case-insensitive lookup for downstream devices
                        d_lower = {k.lower(): v for k, v in downstream.items()}
                        down_name = d_lower.get('name', 'Unknown')
                        down_type = d_lower.get('ci_type', d_lower.get('type', 'Unknown'))
                        output += f"- {down_name} ({down_type})\n"
                    output += "\n"

        # Add service impact information
        affected_services = result.get("affected_services", {})
        if affected_services:
            output += "### Affected Services\n\n"
            all_services = []
            for device_id, service_list in affected_services.items():
                all_services.extend(service_list)

            # Deduplicate services
            unique_services = {}
            for service in all_services:
                service_id = service.get("service_id")
                if service_id and service_id not in unique_services:
                    unique_services[service_id] = service

            for service in unique_services.values():
                # Case-insensitive lookup for services
                s_lower = {k.lower(): v for k, v in service.items()}
                svc_name = s_lower.get('name', 'Unknown')
                svc_crit = s_lower.get('criticality', 'Unknown')
                output += f"- **{svc_name}** (Criticality: {svc_crit})\n"

        # Add sources
        sources = result.get("sources", [])
        if sources:
            output += f"\n## {format_sources(sources)}"

        # Add MCP agent attribution
        output += f"\n\n*Processed by {result.get('agent_type', 'Unknown')} agent ({result.get('agent_id', 'Unknown')})*"

        return output

    except Exception as e:
        logger.error(f"Error in device search interface: {e}")
        import traceback
        logger.error(f"Traceback: {traceback.format_exc()}")
        return f"## Device Search Results\nError processing query: {str(e)}\n\nNo sources used."

def troubleshooting_interface(issue_description, related_cis, logs):
    """Handle troubleshooting queries using MCP"""
    # Combine inputs into a single query
    query = f"Issue: {issue_description}\n"
    if related_cis:
        query += f"Related devices: {related_cis}\n"
    if logs:
        query += f"Logs: {logs}\n"

    # Process with MCP system
    result = mcp_registry.process_query(query, agent_type="troubleshooting", context=shared_context)

    # Format the response
    if "analysis" in result:
        response = result["analysis"]
        suggested_actions = result.get("suggested_actions", [])
        action_text = "\n".join([f"- {action}" for action in suggested_actions]) if suggested_actions else ""

        sources = format_sources(result.get("sources", []))

        # Format output
        output = f"## Analysis\n{response}\n\n"
        if action_text:
            output += f"## Suggested Actions\n{action_text}\n\n"
        output += f"## {sources}"

        # Add MCP agent attribution
        output += f"\n\n*Processed by {result.get('agent_type', 'Unknown')} agent ({result.get('agent_id', 'Unknown')})*"
    else:
        output = f"## Error\n{result.get('error', 'Unknown error occurred')}"

    return output

def knowledge_base_interface(query, doc_type):
    """Handle knowledge base queries using MCP"""
    # Add document type to query if specified
    if doc_type and doc_type != "All Types":
        query = f"{query} (document type: {doc_type})"

    # Process with MCP system
    result = mcp_registry.process_query(query, agent_type="knowledge_base", context=shared_context)

    # Format the response
    if "answer" in result:
        response = result["answer"]
        sources = format_sources(result.get("sources", []))
        related_topics = result.get("related_topics", [])

        # Format output
        output = f"## Knowledge Base Information\n{response}\n\n"

        if related_topics:
            topic_text = ", ".join(related_topics)
            output += f"## Related Topics\n{topic_text}\n\n"

        output += f"## {sources}"

        # Add MCP agent attribution
        output += f"\n\n*Processed by {result.get('agent_type', 'Unknown')} agent ({result.get('agent_id', 'Unknown')})*"
    else:
        output = f"## Error\n{result.get('error', 'Unknown error occurred')}"

    return output

def observability_interface(ci_types, metrics, time_range):
    """Handle observability queries using MCP"""
    # Combine inputs into a single query
    query = f"I need to analyze metrics for CI types: {ci_types}, focusing on these metrics: {metrics}, over time range: {time_range}"

    # Process with MCP system
    result = mcp_registry.process_query(query, agent_type="observability", context=shared_context)

    # Format the response
    if "assessment" in result:
        response = result["assessment"]
        sources = format_sources(result.get("sources", []))

        # Format output
        output = f"## Network Health Assessment\n{response}\n\n"
        output += f"## {sources}"

        # Add MCP agent attribution
        output += f"\n\n*Processed by {result.get('agent_type', 'Unknown')} agent ({result.get('agent_id', 'Unknown')})*"
    else:
        output = f"## Error\n{result.get('error', 'Unknown error occurred')}"

    return output

def incident_resolution_interface(incident_id, title, description, status, priority, affected_cis):
    """Handle incident resolution queries using MCP"""
    # Combine inputs into a single query
    query = f"Incident ID: {incident_id}\nTitle: {title}\nDescription: {description}\nStatus: {status}\nPriority: {priority}\nAffected CIs: {affected_cis}"

    # Process with MCP system
    result = mcp_registry.process_query(query, agent_type="incident_resolution", context=shared_context)

    # Format the response
    if "summary" in result:
        response = result["summary"]
        sources = format_sources(result.get("sources", []))
        action_items = result.get("action_items", [])

        # Format output
        output = f"## Incident Resolution\n{response}\n\n"

        if action_items:
            output += "## Action Items\n"
            for i, item in enumerate(action_items, 1):
                output += f"{i}. {item.get('action')}\n"
                output += f"   - Owner: {item.get('owner')}\n"
                output += f"   - Deadline: {item.get('deadline')}\n\n"

        output += f"## {sources}"

        # Add MCP agent attribution
        output += f"\n\n*Processed by {result.get('agent_type', 'Unknown')} agent ({result.get('agent_id', 'Unknown')})*"
    else:
        output = f"## Error\n{result.get('error', 'Unknown error occurred')}"

    return output

# Create the Gradio app with tabs
with gr.Blocks(title="GenAI Enabled Integrated Platform Support ") as demo:
    gr.Markdown("# GenAI Enabled Intergrated Platform")
    gr.Markdown("""
    This demo showcases a complete Model Context Protocol (MCP) implementation with RAG-enhanced agents.
    The system maintains context across different agent interactions.The Power of Agentic RAG and LangGraph for efficient information retrieval.
    """)

    # Initialize the system when loading
    system_initialized = gr.Checkbox(value=False, visible=False, label="System Initialized")

    demo.load(initialize_system, None, system_initialized)

    with gr.Tabs():
        with gr.Tab("Device Search"):
            with gr.Group():
                search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Find all routers in NYC' or 'Show critical devices'")
                search_btn = gr.Button("Search Devices")
                search_output = gr.Markdown(label="Search Results")
                search_btn.click(device_search_interface, [search_query], search_output)

                gr.Markdown("""
                ### Example Queries
                - "Find all active routers"
                - "core devices with high criticality"
                - "Find switches in NYC"
                - "Show all firewall devices"
                - "Find router R001"
                """)

        with gr.Tab("Troubleshooting"):
            with gr.Group():
                issue_description = gr.Textbox(label="Issue Description", lines=4, placeholder="Describe the network issue you're experiencing...")
                related_cis = gr.Textbox(label="Related Configuration Items (comma-separated)", placeholder="e.g., router-01, switch-03, firewall-02")
                logs = gr.Textbox(label="Relevant Logs (optional)", lines=4, placeholder="Paste any relevant log entries here...")
                troubleshoot_btn = gr.Button("Analyze Issue")
                troubleshoot_output = gr.Markdown(label="Analysis Results")
                troubleshoot_btn.click(troubleshooting_interface, [issue_description, related_cis, logs], troubleshoot_output)

        with gr.Tab("Observability"):
            with gr.Group():
                ci_types = gr.Textbox(label="CI Types to Analyze (comma-separated)", value="router, switch")
                metrics = gr.Textbox(label="Metrics to Analyze (comma-separated)", value="cpu_utilization, latency, memory_utilization")
                time_range = gr.Dropdown(label="Time Range", choices=["last_1h", "last_6h", "last_12h", "last_24h", "last_3d", "last_7d"], value="last_24h")
                observe_btn = gr.Button("Analyze Metrics")
                observe_output = gr.Markdown(label="Analysis Results")
                observe_btn.click(observability_interface, [ci_types, metrics, time_range], observe_output)

        with gr.Tab("Knowledge Base"):
            with gr.Group():
                kb_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Network latency troubleshooting' or 'Firewall best practices'")
                kb_doc_type = gr.Dropdown(label="Document Type", choices=["All Types", "manual", "faq", "best_practice", "troubleshooting_guide", "reference"], value="All Types")
                kb_btn = gr.Button("Search Knowledge Base")
                kb_output = gr.Markdown(label="Search Results")
                kb_btn.click(knowledge_base_interface, [kb_query, kb_doc_type], kb_output)

        with gr.Tab("Incident Resolution"):
            with gr.Group():
                inc_id = gr.Textbox(label="Incident ID", value="INC-001")
                inc_title = gr.Textbox(label="Incident Title", value="Network Outage in NYC Office")
                inc_description = gr.Textbox(label="Description", lines=4, value="Users in the NYC office reported a complete loss of network connectivity at 9:15 AM.")
                inc_status = gr.Dropdown(label="Status", choices=["open", "in_progress", "resolved", "closed"], value="resolved")
                inc_priority = gr.Dropdown(label="Priority", choices=["critical", "high", "medium", "low"], value="critical")
                inc_cis = gr.Textbox(label="Affected CIs (comma-separated)", value="router-nyc-01, switch-nyc-03, firewall-nyc-01")
                inc_btn = gr.Button("Generate Incident Summary")
                inc_output = gr.Markdown(label="Incident Summary")
                inc_btn.click(incident_resolution_interface, [inc_id, inc_title, inc_description, inc_status, inc_priority, inc_cis], inc_output)

        with gr.Tab("About MCP Framework & LangGraph"):
            gr.Markdown("""
            ## Model Context Protocol (MCP) Framework

            This system demonstrates the four key elements of MCP:

            ### 1. Specialized Agents

            Each agent has specific expertise and capabilities:
            - **Troubleshooting Agent**: Diagnoses network issues and suggests resolution steps
            - **Device Search Agent**: Finds devices and analyzes topology relationships
            - **Knowledge Base Agent**: Retrieves information and documentation
            - **Observability Agent**: Analyzes metrics and detects anomalies
            - **Incident Resolution Agent**: Manages incident lifecycle and provides guidance

            ### 2. Context Management

            The MCP framework maintains shared context between agents:
            - Conversation history for continuity between queries
            - Entity memory to track devices, services, and incidents
            - Execution state to share information between agent interactions

            ### 3. Agent Registry

            All agents are registered in a central system:
            - Automatic routing of queries to the most appropriate agent
            - Capability-based discovery for specialized functions
            - Metadata tracking for better transparency

            ### 4. Knowledge Integration

            Relevant knowledge is retrieved dynamically:
            - RAG system integration for retrieving domain knowledge
            - Context-aware queries that incorporate previous interactions
            - Cross-agent knowledge sharing

            ### How It Works

            When you submit a query:
            1. The system analyzes your query to determine the most appropriate agent
            2. The selected agent retrieves relevant knowledge using RAG
            3. Query context is enriched with information from previous interactions
            4. The agent processes the query and builds a structured response
            5. Context is updated for future interactions

            ### Integration with LangGraph

            The Device Search agent leverages LangGraph for a multi-step workflow:
            1. Query parsing: Interprets intent and extracts search criteria
            2. Device search: Finds devices matching criteria
            3. Topology analysis: Maps relationships between devices
            4. Results formatting: Organizes findings in a structured format

            This gives you the benefits of both LangGraph's structured workflow and MCP's context sharing.
            """)

# Entry point to run the app
if __name__ == "__main__":
    # Launch with sharing enabled for Colab access
    demo.launch(share=True)

Writing update_gradio_app.py


### Agent Specific Documents for RAG implementation and Embeddings

In [9]:
# Create a document collection for all agent types
%%writefile network_knowledge_base.txt

# DEVICE INVENTORY DOCUMENTS #
DOCUMENT INV1:
Core Network Devices
- Device ID: R001
  Name: core-router-01
  Type: router
  Status: active
  Location: NYC
  Importance: critical
  Description: Primary core router for east region datacenter
  Connected to: WAN001, S001A, S001B

- Device ID: R002
  Name: core-router-02
  Type: router
  Status: active
  Location: SFO
  Importance: critical
  Description: Primary core router for west region datacenter
  Connected to: WAN002, S002A, S002B

- Device ID: FW001
  Name: edge-firewall-01
  Type: firewall
  Status: active
  Location: NYC
  Importance: critical
  Description: Primary internet-facing firewall for east region
  Connected to: R001, DMZ001

DOCUMENT INV2:
Distribution Network Devices
- Device ID: S001A
  Name: distribution-switch-01a
  Type: switch
  Status: active
  Location: NYC
  Importance: high
  Description: Primary distribution switch for NYC east zone
  Connected to: R001, S001A1, S001A2

- Device ID: S001B
  Name: distribution-switch-01b
  Type: switch
  Status: active
  Location: NYC
  Importance: high
  Description: Secondary distribution switch for NYC east zone
  Connected to: R001, S001B1, S001B2

- Device ID: S002A
  Name: distribution-switch-02a
  Type: switch
  Status: active
  Location: SFO
  Importance: high
  Description: Primary distribution switch for SFO north zone
  Connected to: R002, S002A1, S002A2

DOCUMENT INV3:
Access Network Devices
- Device ID: S001A1
  Name: access-switch-01a1
  Type: switch
  Status: active
  Location: NYC-Floor1
  Importance: medium
  Description: Access switch for NYC east zone floor 1
  Connected to: S001A, Various endpoints

- Device ID: S001A2
  Name: access-switch-01a2
  Type: switch
  Status: active
  Location: NYC-Floor2
  Importance: medium
  Description: Access switch for NYC east zone floor 2
  Connected to: S001A, Various endpoints

- Device ID: S001B1
  Name: access-switch-01b1
  Type: switch
  Status: warning
  Location: NYC-Floor3
  Importance: medium
  Description: Access switch for NYC east zone floor 3 (showing interface errors)
  Connected to: S001B, Various endpoints

DOCUMENT INV4:
Special Purpose Devices
- Device ID: LB001
  Name: load-balancer-01
  Type: load-balancer
  Status: active
  Location: NYC
  Importance: high
  Description: F5 load balancer for customer-facing applications
  Connected to: S001A, APP001, APP002, APP003

- Device ID: WAP001
  Name: wireless-controller-01
  Type: wireless-controller
  Status: active
  Location: NYC
  Importance: medium
  Description: Controls all wireless access points in NYC office
  Connected to: S001A

- Device ID: DMZ001
  Name: dmz-switch-01
  Type: switch
  Status: active
  Location: NYC
  Importance: high
  Description: DMZ network switch
  Connected to: FW001, WEB001, WEB002

# TROUBLESHOOTING DOCUMENTS #

DOCUMENT T1:
Router High CPU Troubleshooting Guide
When a router shows high CPU utilization (>80%), first identify the process consuming resources with 'show processes cpu'.
Common causes include:
1. Routing protocol misconfiguration or instability (BGP flapping, OSPF reconvergence)
2. Access Control List (ACL) processing for high traffic volumes
3. Network Address Translation (NAT) for numerous concurrent sessions
4. Denial of Service (DoS) attacks or traffic anomalies
5. Logging verbosity set too high
Resolution steps:
- Check for routing protocol issues with 'show ip route summary' and 'show ip protocol'
- Review ACL configurations and consider hardware acceleration
- Monitor interface errors with 'show interface' for physical issues
- Implement Control Plane Policing (CoPP) for protection
Priority: High (Critical for core routers, Medium for edge devices)

DOCUMENT T2:
Network Latency and Packet Loss Resolution
Latency and packet loss often occur together and can severely impact application performance.
Troubleshooting approach:
1. Quantify the issue - use ping and traceroute to measure latency and loss percentage
2. Isolate the problem area - test segments of the network path separately
3. Check physical connectivity - look for interface errors, duplex mismatches
4. Review QoS configuration - ensure proper traffic prioritization
5. Monitor for microbursts - short traffic spikes that cause buffer overflows
Common root causes:
- Congestion on interfaces (check with 'show interface' for drops/discards)
- Suboptimal routing (check with 'show ip route' and 'traceroute')
- Faulty hardware or cabling (check error counters and interface status)
- MTU mismatches or fragmentation issues
Priority: High (Can significantly impact user experience)

DOCUMENT T3:
Switch Port Errors and Performance Issues
Port errors on switches often indicate physical or data link layer problems.
Error types and causes:
- CRC errors: Often caused by cabling issues, EMI, or duplex mismatches
- Collisions: Typically seen in half-duplex environments
- Input/output errors: Can indicate buffer issues or hardware problems
- Late collisions: Usually a sign of cable length violations or duplex mismatch
Troubleshooting steps:
1. Check error statistics with 'show interface' or 'show interface counters errors'
2. Verify duplex and speed settings on both ends of the connection
3. Test or replace cabling if physical issues are suspected
4. Check for port utilization and buffer exhaustion
5. Verify STP (Spanning Tree Protocol) status and configuration
Priority: Medium (Unless affecting critical services)

# OBSERVABILITY DOCUMENTS #

DOCUMENT O1:
Network Metric Baselines and Thresholds
Understanding normal behavior is essential for effective monitoring.
Key metrics and typical thresholds:
- Interface utilization: Alert at 70% sustained, Critical at 90%
- CPU utilization: Alert at 70% sustained, Critical at 90%
- Memory utilization: Alert at 80% sustained, Critical at 95%
- Packet loss: Alert at 1%, Critical at 5%
- Latency (internal network): Alert at 10ms, Critical at 50ms
- Jitter: Alert at 10ms, Critical at 30ms
Seasonal patterns:
- Business hours typically show 2-3x higher utilization than off-hours
- Month-end processing may increase database traffic by 50%
- Backup windows may saturate certain links during scheduled times
Recommended approach: Establish baselines over 2-4 weeks of normal operation before setting thresholds.

DOCUMENT O2:
Detecting Network Performance Anomalies
Anomaly detection requires correlation of multiple metrics.
Common anomalies and their signatures:
1. DDoS attacks: Sudden increase in traffic volume, connection counts, and CPU usage
2. Routing flaps: Intermittent connectivity, increased CPU, routing protocol messages
3. Broadcast storms: High broadcast/multicast packet counts, increased latency
4. Link flapping: Interface status changes, routing reconvergence, packet loss
5. Memory leaks: Gradually increasing memory usage without corresponding traffic increase
Detection methods:
- Rate of change is often more important than absolute values
- Correlate metrics across multiple devices to identify systemic issues
- Look for divergence from historical patterns at similar time periods
- Set dynamic thresholds based on standard deviations from baseline

DOCUMENT O3:
Network Capacity Planning and Trending
Proactive capacity management prevents performance issues.
Key metrics to trend:
- Interface throughput (95th percentile usage over time)
- Growth rate of connected devices and users
- Connection rates and session counts
- Application-specific metrics (response times, transaction rates)
Warning signs requiring attention:
- Consistent growth of >5% per month on critical links
- Latency increases during peak periods
- Reduction in available headroom below 30%
- Step changes in utilization patterns
Planning guidelines:
- Plan capacity upgrades when projected to reach 70% within 6 months
- Consider redundancy and failover capacity in calculations
- Account for seasonal variations and special events

# DEVICE SEARCH DOCUMENTS #

DOCUMENT D1:
Network Topology Fundamentals
Understanding dependencies between network devices is crucial for impact analysis.
Network layers and dependencies:
1. Core layer: High-speed backbone, minimal configuration, maximum reliability
2. Distribution layer: Routing, filtering, QoS, and policy enforcement
3. Access layer: End-user connectivity, security features, and service delivery
Critical paths:
- Default gateway dependency: End users depend on their default gateway
- Uplink dependency: Access switches depend on distribution switches
- Routing peer dependency: Routers depend on their routing neighbors
Redundancy considerations:
- Single points of failure should be identified and mitigated
- Redundant paths should not share physical infrastructure
- Control plane redundancy is as important as data plane redundancy

DOCUMENT D2:
Network Device Roles and Classification
Properly classifying device roles helps with troubleshooting and change management.
Common device roles:
- Core router: Central backbone connectivity and routing
- Edge router: Internet or WAN connectivity
- Distribution switch: Aggregation and policy enforcement
- Access switch: End-user and device connectivity
- Firewall: Security policy enforcement and segmentation
- Load balancer: Application traffic distribution and health monitoring
- WAN optimizer: Traffic compression and protocol optimization
Criticality classification:
- Tier 1: Failure affects entire network or critical business services
- Tier 2: Failure affects multiple departments or non-critical services
- Tier 3: Failure affects limited number of users or services
Change management implications:
- Tier 1 devices require more rigorous testing and maintenance windows
- Dependencies should be documented and considered during changes
- Backup configurations must be verified before significant changes

DOCUMENT D3:
High-Availability Network Design Patterns
Resilient networks require both redundancy and proper failover mechanisms.
Key high-availability patterns:
1. Active/Standby: Primary device with backup that takes over on failure
2. Active/Active: Load shared across multiple devices with failover capability
3. N+1 redundancy: Additional capacity beyond what's required for normal operation
4. Distributed systems: Services spread across multiple devices with no single point of failure
Implementation technologies:
- Routing redundancy: HSRP, VRRP, GLBP for default gateway redundancy
- Link redundancy: Equal-cost multi-path, port channels, spanning tree
- Control plane redundancy: NSF (Non-Stop Forwarding), SSO (Stateful Switchover)
- Device redundancy: Clustered systems, chassis redundancy, geographic distribution
Recovery metrics:
- RTO (Recovery Time Objective): Maximum acceptable time to restore service
- RPO (Recovery Point Objective): Maximum acceptable data loss during recovery
- MTTR (Mean Time To Repair): Average time to restore service after failure

# KNOWLEDGE BASE DOCUMENTS #

DOCUMENT K1:
Network Protocol Troubleshooting Quick Reference
Common protocols and troubleshooting approaches:
1. TCP/IP:
   - Check IP addressing and subnet configuration
   - Verify routing tables and default gateways
   - Test with ping, traceroute, and packet captures
   - Common issues: Address conflicts, fragmentation, MTU issues
2. DNS:
   - Verify DNS server configuration
   - Test resolution with nslookup or dig
   - Check for latency or timeouts
   - Common issues: Incorrect DNS servers, cache poisoning, zone transfer failures
3. DHCP:
   - Verify DHCP server availability
   - Check IP address pool exhaustion
   - Test with ipconfig /release and renew
   - Common issues: Address conflicts, unauthorized DHCP servers, misconfigured scopes
4. BGP:
   - Verify neighbor relationships
   - Check for route advertisements and withdrawals
   - Review AS path and route selection
   - Common issues: Peer misconfigurations, route flapping, policy errors
5. OSPF:
   - Verify neighbor adjacencies
   - Check for area configuration consistency
   - Review LSA database
   - Common issues: Area mismatches, authentication failures, MTU mismatches

DOCUMENT K2:
Network Security Best Practices
Essential security measures for network infrastructure:
1. Device hardening:
   - Disable unused services and ports
   - Implement strong password policies
   - Use encrypted management protocols (SSH, HTTPS)
   - Regular patching and firmware updates
2. Access control:
   - Implement principle of least privilege
   - Use AAA (Authentication, Authorization, Accounting)
   - Secure management plane with ACLs
   - Implement role-based access control
3. Traffic filtering:
   - Deploy firewalls at network boundaries
   - Use ACLs for basic traffic filtering
   - Implement deep packet inspection where needed
   - Regularly audit and clean up rule sets
4. Monitoring and detection:
   - Enable logging and send to central collector
   - Implement network behavior analysis
   - Deploy IDS/IPS systems at critical points
   - Regular review of security events
5. Incident response:
   - Documented procedures for common security incidents
   - Ability to quickly implement traffic filtering
   - Regular security drills and testing
   - Forensic data collection capabilities

DOCUMENT K3:
QoS Configuration Guidelines
Quality of Service implementation best practices:
1. Classification methods:
   - Layer 2: 802.1p CoS (3 bits, values 0-7)
   - Layer 3: IP Precedence (3 bits, values 0-7) or DSCP (6 bits, values 0-63)
   - Application recognition: NBAR or deep packet inspection
2. Queuing mechanisms:
   - Priority Queuing (PQ): Strict priority for delay-sensitive traffic
   - Weighted Fair Queuing (WFQ): Balanced bandwidth sharing
   - Class-Based Weighted Fair Queuing (CBWFQ): Class-based bandwidth allocation
   - Low Latency Queuing (LLQ): Combines CBWFQ with priority queue
3. Common service classes:
   - Voice: EF (DSCP 46), strict priority, minimal delay
   - Video: AF41 (DSCP 34), guaranteed bandwidth
   - Critical data: AF31 (DSCP 26), guaranteed bandwidth
   - Best effort: Default (DSCP 0), remaining bandwidth
4. Implementation guidelines:
   - Classify and mark traffic as close to the source as possible
   - Be consistent with classifications across the network
   - Don't assign more than 33% to priority queue to avoid starvation
   - Monitor effectiveness with performance metrics

# INCIDENT RESOLUTION DOCUMENTS #

DOCUMENT I1:
Incident Severity Classification and Response
Standard incident severity levels and response expectations:
1. Critical (Severity 1):
   - Definition: Complete outage of a critical business service
   - Response time: Immediate (15 minutes or less)
   - Resolution goal: 4 hours or less
   - Escalation: Automatic to senior engineers and management
   - Communication: Hourly updates to stakeholders
2. High (Severity 2):
   - Definition: Significant degradation of critical service or complete outage of non-critical service
   - Response time: 30 minutes or less
   - Resolution goal: 8 hours or less
   - Escalation: To senior engineers after 2 hours if unresolved
   - Communication: Updates every 2 hours
3. Medium (Severity 3):
   - Definition: Minor service degradation or issue affecting small user group
   - Response time: 2 hours or less
   - Resolution goal: 24 hours or less
   - Escalation: To senior engineers after 8 hours if unresolved
   - Communication: Daily updates
4. Low (Severity 4):
   - Definition: Minimal impact, often a single user or cosmetic issue
   - Response time: Next business day
   - Resolution goal: Within 5 business days
   - Escalation: Based on SLA adherence
   - Communication: As needed or requested

DOCUMENT I2:
Root Cause Analysis Methodology
Structured approach to identifying incident root causes:
1. Data collection phase:
   - Gather all logs and monitoring data
   - Document timeline of events
   - Collect configuration files and changes
   - Interview involved personnel
2. Analysis techniques:
   - Timeline analysis: Map all events chronologically
   - Change analysis: Review recent changes prior to incident
   - Correlation analysis: Identify patterns across multiple data sources
   - The "5 Whys" technique: Repeatedly ask why to drill down to root cause
3. Categorization framework:
   - Technology issues: Hardware failures, software bugs, capacity limits
   - Process issues: Inadequate procedures, missed steps, poor communication
   - People issues: Training gaps, human error, resource constraints
4. Documentation requirements:
   - Incident summary and impact
   - Timeline of key events
   - Root cause statement (specific, factual, and actionable)
   - Contributing factors
   - Recommendations to prevent recurrence

DOCUMENT I3:
Incident Communication Templates
Standardized communication formats for effective incident management:
1. Initial notification:
   - Incident identifier and timestamp
   - Brief description of the issue
   - Known impact (services and users affected)
   - Current status (investigating, identified, in progress)
   - Next update timing
2. Status update:
   - Reference to incident identifier
   - Current status and progress
   - Actions taken since last update
   - Current impact assessment
   - Estimated time to resolution (if known)
   - Next update timing
3. Resolution notification:
   - Reference to incident identifier
   - Confirmation of resolution
   - Brief description of solution implemented
   - Current service status
   - Duration of incident
   - Follow-up activities planned
4. Post-incident summary:
   - Incident overview and duration
   - Root cause and contributing factors
   - Resolution steps taken
   - Preventive measures identified
  - Lessons learned and recommendations

Writing network_knowledge_base.txt


### After Executing  all the cells we can now run the Demo to test our AI application

In [11]:
# Run the demo
!python update_gradio_app.py

2025-03-26 06:24:58.917166: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1742970298.937667    6670 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1742970298.943921    6670 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-26 06:24:58.964593: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2025-03-26 06:25:01,654 - gradio_app - INFO - Loading TinyLlama model for LangGraph...
2025-03-26 06:25:02,290 - accelerate.u