# Self-Adapting Infrastructure Recovery DevOps Agent

This notebook demonstrates how to build an AI agent that can reason about infrastructure issues and adapt its recovery strategy based on real-time feedback, moving beyond predetermined action sequences.

## Import the Needed Packages

In [None]:
# Install required packages from requirements.txt
%pip install -r requirements.txt

In [1]:
import os
import json
import random
from typing import Annotated, Dict, List
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv
from IPython.display import display, HTML

from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import FunctionCallContent, FunctionResultContent, StreamingTextContent
from semantic_kernel.functions import kernel_function

## Creating the Infrastructure Recovery Plugin

This plugin simulates infrastructure monitoring and recovery operations. Unlike predetermined scripts, our agent will:
1. Check system status dynamically
2. Execute recovery actions based on current state
3. Verify results and adapt strategy if needed
4. Handle unexpected outcomes by replanning

In [2]:
class InfrastructureRecoveryPlugin:
    """Infrastructure monitoring and recovery operations plugin."""
    
    def __init__(self):
        # Simulate infrastructure state
        self.system_state = {
            "pods": {
                "web-app-pod": {"status": "crashed", "restarts": 0, "memory_usage": "high"},
                "api-pod": {"status": "running", "restarts": 1, "memory_usage": "normal"},
                "db-pod": {"status": "pending", "restarts": 0, "memory_usage": "low"}
            },
            "services": {
                "web-service": {"status": "unhealthy", "endpoint_reachable": False},
                "api-service": {"status": "healthy", "endpoint_reachable": True},
                "db-service": {"status": "degraded", "endpoint_reachable": True}
            },
            "volumes": {
                "app-volume": {"status": "mounted", "usage": "85%"},
                "db-volume": {"status": "corrupted", "usage": "60%"}
            }
        }
        self.action_history = []
    
    @kernel_function(description="Get current status of all infrastructure components")
    def check_system_status(self) -> Annotated[str, "Returns current infrastructure status"]:
        """Check the current state of pods, services, and volumes."""
        status_report = {
            "timestamp": "2025-08-26T10:30:00Z",
            "overall_health": "degraded",
            "components": self.system_state
        }
        return json.dumps(status_report, indent=2)
    
    @kernel_function(description="Restart a specific pod")
    def restart_pod(self, pod_name: Annotated[str, "Name of the pod to restart"]) -> Annotated[str, "Result of pod restart operation"]:
        """Restart a pod and return the result."""
        self.action_history.append(f"restart_pod({pod_name})")
        
        if pod_name not in self.system_state["pods"]:
            return f"ERROR: Pod '{pod_name}' not found"
        
        pod = self.system_state["pods"][pod_name]
        pod["restarts"] += 1
        
        # Simulate different outcomes based on pod state
        if pod_name == "web-app-pod" and pod["memory_usage"] == "high":
            # High memory usage might cause restart to fail
            if random.random() > 0.3:  # 70% success rate
                pod["status"] = "running"
                pod["memory_usage"] = "normal"
                return f"SUCCESS: Pod '{pod_name}' restarted successfully (attempt {pod['restarts']})"
            else:
                pod["status"] = "crashed"
                return f"FAILURE: Pod '{pod_name}' failed to restart due to memory issues (attempt {pod['restarts']})"
        
        # Normal restart
        pod["status"] = "running"
        return f"SUCCESS: Pod '{pod_name}' restarted successfully (attempt {pod['restarts']})"
    
    @kernel_function(description="Delete and recreate a volume")
    def recreate_volume(self, volume_name: Annotated[str, "Name of the volume to recreate"]) -> Annotated[str, "Result of volume recreation"]:
        """Delete and recreate a volume."""
        self.action_history.append(f"recreate_volume({volume_name})")
        
        if volume_name not in self.system_state["volumes"]:
            return f"ERROR: Volume '{volume_name}' not found"
        
        volume = self.system_state["volumes"][volume_name]
        
        if volume["status"] == "corrupted":
            volume["status"] = "mounted"
            volume["usage"] = "10%"
            return f"SUCCESS: Volume '{volume_name}' recreated and mounted"
        else:
            return f"INFO: Volume '{volume_name}' was already healthy, no action needed"
    
    @kernel_function(description="Check if a service is responding")
    def check_service_health(self, service_name: Annotated[str, "Name of the service to check"]) -> Annotated[str, "Service health status"]:
        """Check if a service endpoint is reachable and healthy."""
        self.action_history.append(f"check_service_health({service_name})")
        
        if service_name not in self.system_state["services"]:
            return f"ERROR: Service '{service_name}' not found"
        
        service = self.system_state["services"][service_name]
        
        # Update service status based on pod dependencies
        if service_name == "web-service":
            web_pod = self.system_state["pods"]["web-app-pod"]
            if web_pod["status"] == "running":
                service["status"] = "healthy"
                service["endpoint_reachable"] = True
            else:
                service["status"] = "unhealthy"
                service["endpoint_reachable"] = False
        
        return json.dumps({
            "service": service_name,
            "status": service["status"],
            "endpoint_reachable": service["endpoint_reachable"],
            "timestamp": "2025-08-26T10:35:00Z"
        }, indent=2)
    
    @kernel_function(description="Scale a pod to increase resources")
    def scale_pod_resources(self, pod_name: Annotated[str, "Name of the pod to scale"], 
                           memory_limit: Annotated[str, "New memory limit (e.g., '2Gi')"]) -> Annotated[str, "Result of scaling operation"]:
        """Scale pod resources to handle high memory usage."""
        self.action_history.append(f"scale_pod_resources({pod_name}, {memory_limit})")
        
        if pod_name not in self.system_state["pods"]:
            return f"ERROR: Pod '{pod_name}' not found"
        
        pod = self.system_state["pods"][pod_name]
        
        if pod["memory_usage"] == "high":
            pod["memory_usage"] = "normal"
            pod["status"] = "running"
            return f"SUCCESS: Pod '{pod_name}' scaled to {memory_limit}, memory usage normalized"
        else:
            return f"INFO: Pod '{pod_name}' memory usage is already normal"
    
    @kernel_function(description="Get history of actions performed")
    def get_action_history(self) -> Annotated[str, "List of actions performed so far"]:
        """Get the history of recovery actions performed."""
        return json.dumps({
            "actions_performed": self.action_history,
            "total_actions": len(self.action_history)
        }, indent=2)

## Creating the Azure OpenAI Client

We'll use Azure OpenAI with GPT-4 for better reasoning capabilities needed for adaptive infrastructure recovery.

In [3]:
load_dotenv()

# Create Azure OpenAI chat completion service
chat_completion_service = AzureChatCompletion(
    deployment_name=os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o-mini"),
    endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT"),
    api_version=os.environ.get("AZURE_OPENAI_API_VERSION", "2024-02-01"),
    api_key=os.environ.get("AZURE_OPENAI_API_KEY")
)

# Alternative: Azure AD Authentication (uncomment to use)
# credential = DefaultAzureCredential()
# def get_azure_ad_token():
#     token = credential.get_token("https://cognitiveservices.azure.com/.default")
#     return token.token
#
# chat_completion_service = AzureChatCompletion(
#     deployment_name=os.environ.get("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o-mini"),
#     endpoint=os.environ.get("AZURE_OPENAI_ENDPOINT"),
#     api_version=os.environ.get("AZURE_OPENAI_API_VERSION", "2024-02-01"),
#     ad_token=get_azure_ad_token()
# )

## Creating the Self-Adapting DevOps Agent

This agent is designed with instructions that emphasize adaptive reasoning and strategy adjustment based on action results.

In [4]:
# Create the self-adapting DevOps agent
agent = ChatCompletionAgent(
    service=chat_completion_service,
    name="DevOpsRecoveryAgent",
    instructions="""You are a self-adapting infrastructure recovery agent. Your key capabilities:

1. ADAPTIVE REASONING: Always check system status first, then reason about the best recovery approach
2. RESULT-BASED STRATEGY: After each action, verify the result and adjust your next steps accordingly
3. FAILURE HANDLING: If an action fails, analyze why and try alternative approaches
4. DEPENDENCY AWARENESS: Consider how components depend on each other (e.g., services depend on pods)
5. PROGRESSIVE RECOVERY: Start with least disruptive actions, escalate only when needed

Recovery Strategy Guidelines:
- Always start by checking system status to understand current state
- For crashed pods with high memory usage, try scaling resources before restarting
- For corrupted volumes, recreate them but be aware this may affect dependent pods
- After any action, verify the result and check service health
- If predetermined approaches fail, reason about alternative solutions
- Keep track of what you've tried to avoid infinite loops

Respond with clear explanations of your reasoning and next steps.""",
    plugins=[InfrastructureRecoveryPlugin()],
)

## Running the Self-Adapting Agent

Let's test the agent with different infrastructure recovery scenarios to see how it adapts its strategy based on real-time feedback.

In [5]:
# Test scenarios for the self-adapting agent
user_inputs = [
    "Our web application is not responding. Please investigate and fix the issue.",
    "The fix didn't work as expected. What's your next strategy?",
    "We're also seeing database connectivity issues. How do you adapt your recovery plan?"
]

async def run_adaptive_recovery():
    thread: ChatHistoryAgentThread | None = None

    for i, user_input in enumerate(user_inputs, 1):
        html_output = (
            f"<div style='margin-bottom:15px; padding:10px; border-left:4px solid #0066cc; background:#f8f9fa'>"
            f"<div style='font-weight:bold; color:#0066cc'>Scenario {i}:</div>"
            f"<div style='margin-left:20px; margin-top:5px'>{user_input}</div></div>"
        )

        agent_name = None
        full_response: list[str] = []
        function_calls: list[str] = []

        # Buffer for streaming function calls
        current_function_name = None
        argument_buffer = ""

        async for response in agent.invoke_stream(
            messages=user_input,
            thread=thread,
        ):
            thread = response.thread
            agent_name = response.name
            content_items = list(response.items)

            for item in content_items:
                if isinstance(item, FunctionCallContent):
                    if item.function_name:
                        current_function_name = item.function_name

                    if isinstance(item.arguments, str):
                        argument_buffer += item.arguments
                elif isinstance(item, FunctionResultContent):
                    # Show function call with arguments
                    if current_function_name:
                        formatted_args = argument_buffer.strip()
                        try:
                            parsed_args = json.loads(formatted_args)
                            formatted_args = json.dumps(parsed_args, indent=2)
                        except Exception:
                            pass

                        function_calls.append(f"🔧 Executing: {current_function_name}({formatted_args})")
                        current_function_name = None
                        argument_buffer = ""

                    # Show function result
                    function_calls.append(f"📋 Result:\n{item.result}\n")
                elif isinstance(item, StreamingTextContent) and item.text:
                    full_response.append(item.text)

        # Display function calls in an expandable section
        if function_calls:
            html_output += (
                "<div style='margin-bottom:10px'>"
                "<details open>"
                "<summary style='cursor:pointer; font-weight:bold; color:#28a745; margin-bottom:10px;'>🛠️ Recovery Actions & Results</summary>"
                "<div style='margin:10px; padding:15px; background-color:#f1f8ff; "
                "border:1px solid #c6e2ff; border-radius:6px; white-space:pre-wrap; font-family:monospace; font-size:13px; color:#24292e;'>"
                f"{chr(10).join(function_calls)}"
                "</div></details></div>"
            )

        # Display agent reasoning and response
        html_output += (
            "<div style='margin-bottom:20px; padding:15px; background-color:#fff; border:1px solid #e1e4e8; border-radius:6px'>"
            f"<div style='font-weight:bold; color:#28a745; margin-bottom:10px'>🤖 {agent_name or 'DevOps Agent'} Analysis:</div>"
            f"<div style='margin-left:15px; white-space:pre-wrap; line-height:1.5'>{''.join(full_response)}</div></div><hr style='margin:30px 0;'>"
        )

        display(HTML(html_output))

# Run the adaptive recovery scenarios
await run_adaptive_recovery()

## Key Differences from Predetermined Scripts

This self-adapting agent demonstrates several improvements over traditional predetermined remediation scripts:

### 1. **Dynamic Assessment**
- Always checks current system state before taking action
- Adapts strategy based on real-time conditions

### 2. **Result-Driven Decision Making**
- Verifies the outcome of each action
- Adjusts next steps based on success/failure
- Avoids continuing with a failed strategy

### 3. **Intelligent Failure Handling**
- When restart fails due to memory issues, tries resource scaling
- Considers alternative approaches when initial strategy fails
- Learns from action history to avoid repeating failed attempts

### 4. **Dependency Awareness**
- Understands relationships between pods, services, and volumes
- Considers cascade effects of recovery actions
- Verifies dependent services after fixing underlying components

### 5. **Progressive Escalation**
- Starts with least disruptive actions (resource scaling)
- Escalates to more disruptive actions (restarts, volume recreation) only when needed
- Maintains system stability throughout recovery process

## Next Steps

To extend this agent for production use:

1. **Integration with Real Infrastructure APIs** (Kubernetes, Docker, cloud providers)
2. **Enhanced Monitoring** (metrics, logs, alerts integration)
3. **Learning from Outcomes** (ML-based improvement of recovery strategies)
4. **Safety Mechanisms** (approval workflows for destructive actions)
5. **Multi-Environment Support** (dev, staging, production with different risk profiles)

The core principle remains: **reason about each step, verify results, and adapt strategy based on real-world feedback**.