# Phase1: ReAct Investigation

## Overview

Phase1 executes the Investigation Plan generated in the Plan Phase using a LangGraph ReAct framework. It actively investigates the volume I/O issues by executing tools in a sequential manner, analyzing the results, and producing a Fix Plan.

### Key Components

- **LangGraph StateGraph**: Manages the flow of the investigation process
- **SerialToolNode**: Executes tools sequentially based on the Investigation Plan
- **Knowledge Graph Tools**: Tools for querying and analyzing the Knowledge Graph
- **Kubernetes Tools**: Tools for interacting with the Kubernetes API

### Inputs and Outputs

- **Inputs**: 
  - Investigation Plan from Plan Phase
  - Knowledge Graph and collected information from Phase0
  - Pod name, namespace, volume path
- **Outputs**: 
  - Fix Plan with identified root causes and remediation steps
  - Skip Phase2 flag (true if no issues detected or manual intervention required)

In [1]:
# Import necessary libraries
import asyncio
import json
from typing import Dict, List, Any

# Import mock data for demonstration
import sys
sys.path.append('../')
from tests.mock_knowledge_graph import create_mock_knowledge_graph
from tests.mock_kubernetes_data import get_mock_kubernetes_data
from tests.mock_system_data import get_mock_system_data

## Mock LangGraph Components

For demonstration purposes, we'll create mock implementations of the LangGraph components used in Phase1.

In [2]:
class MockLangGraphStateGraph:
    """
    Mock implementation of LangGraph StateGraph for demonstration
    """
    
    def __init__(self, collected_info, phase="phase1", config_data=None):
        """
        Initialize the mock LangGraph StateGraph
        
        Args:
            collected_info: Pre-collected diagnostic information from Phase0
            phase: Phase name ("phase1" or "phase2")
            config_data: Configuration data (optional)
        """
        self.collected_info = collected_info
        self.phase = phase
        self.config_data = config_data or {}
        self.knowledge_graph = collected_info.get('knowledge_graph')
        print(f"Initializing LangGraph StateGraph for {phase}...")
    
    async def ainvoke(self, query, config=None):
        """
        Invoke the graph asynchronously
        
        Args:
            query: Query to send to the graph
            config: Configuration for the graph execution
            
        Returns:
            Dict[str, Any]: Graph execution result
        """
        print(f"Running LangGraph for {self.phase}...")
        
        # Simulate graph execution with delay
        await asyncio.sleep(2)
        
        # Generate a response based on the phase
        if self.phase == "phase1":
            response = self._generate_phase1_response()
        else:
            response = self._generate_phase2_response()
        
        return response
    
    def _generate_phase1_response(self):
        """
        Generate a mock response for Phase1
        
        Returns:
            Dict[str, Any]: Mock Phase1 response
        """
        # Get issues from the knowledge graph
        issues = self.knowledge_graph.issues if self.knowledge_graph else []
        
        # Generate a fix plan based on the issues
        fix_plan = self._generate_fix_plan(issues)
        
        return {
            "messages": [
                {"role": "user", "content": "Execute the Investigation Plan"},
                {"role": "assistant", "content": fix_plan}
            ]
        }
    
    def _generate_phase2_response(self):
        """
        Generate a mock response for Phase2
        
        Returns:
            Dict[str, Any]: Mock Phase2 response
        """
        return {
            "messages": [
                {"role": "user", "content": "Execute the Fix Plan"},
                {"role": "assistant", "content": "Fix Plan executed successfully. All issues have been resolved."}
            ]
        }
    
    def _generate_fix_plan(self, issues):
        """
        Generate a fix plan based on the issues
        
        Args:
            issues: List of issues from the knowledge graph
            
        Returns:
            str: Generated fix plan
        """
        if not issues:
            return """Summary Finding: No issues detected in the system.
Evidence: Knowledge Graph analysis shows no errors or warnings in the system.
Advice: Continue monitoring the system for any future issues.
SKIP_PHASE2: YES"""
        
        # Find the most critical issue
        critical_issues = [issue for issue in issues if issue.get('severity') == 'critical']
        if critical_issues:
            primary_issue = critical_issues[0]
        else:
            primary_issue = issues[0]
        
        # Generate fix plan based on the primary issue
        if primary_issue['category'] == 'filesystem':
            return self._generate_filesystem_fix_plan(primary_issue)
        elif primary_issue['category'] == 'hardware':
            return self._generate_hardware_fix_plan(primary_issue)
        else:
            return self._generate_generic_fix_plan(primary_issue)
    
    def _generate_filesystem_fix_plan(self, issue):
        """
        Generate a fix plan for filesystem issues
        
        Args:
            issue: Issue to generate a fix plan for
            
        Returns:
            str: Generated fix plan
        """
        return f"""# Root Cause Analysis

After executing the Investigation Plan, I have identified the root cause of the volume I/O errors:

## Primary Issue
- **Issue**: {issue['message']}
- **Severity**: {issue['severity']}
- **Entity**: {issue['entity_type']} ({issue['entity_id']})
- **Details**: {issue['details']}

## Evidence
1. XFS filesystem corruption detected in the volume's metadata
2. Kernel logs show XFS_CORRUPT_INODES errors
3. I/O errors reported by the container runtime

## Contributing Factors
- Possible hardware issues with the underlying drive
- Multiple read failures recorded on the drive

# Fix Plan

The following steps should be taken to resolve the issue:

1. Unmount the corrupted filesystem
2. Run xfs_repair with the -L option to fix the filesystem corruption
3. Check the drive health with SMART tools
4. Remount the filesystem
5. Restart the affected pod

## Commands to Execute

```bash
# Step 1: Unmount the filesystem
kubectl delete pod test-pod -n default
umount /var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount

# Step 2: Run xfs_repair
xfs_repair -L /dev/mapper/volume-123-456

# Step 3: Check drive health
smartctl -a /dev/sda

# Step 4: Remount the filesystem
mount /dev/mapper/volume-123-456 /var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount

# Step 5: Restart the pod
kubectl create -f pod-definition.yaml
```
"""
    
    def _generate_hardware_fix_plan(self, issue):
        """
        Generate a fix plan for hardware issues
        
        Args:
            issue: Issue to generate a fix plan for
            
        Returns:
            str: Generated fix plan
        """
        return f"""# Root Cause Analysis

After executing the Investigation Plan, I have identified the root cause of the volume I/O errors:

## Primary Issue
- **Issue**: {issue['message']}
- **Severity**: {issue['severity']}
- **Entity**: {issue['entity_type']} ({issue['entity_id']})
- **Details**: {issue['details']}

## Evidence
1. Multiple I/O errors detected on the drive
2. SMART diagnostics show warning signs for the drive
3. Error logs indicate hardware degradation

## Contributing Factors
- Drive hardware failure
- Loose connections
- Controller issues

# Fix Plan

The following steps should be taken to resolve the issue:

1. Backup important data from the drive
2. Check drive connections
3. Run extended SMART diagnostics
4. If errors persist, replace the drive
5. Restore data to the new drive

## Commands to Execute

```bash
# Step 1: Backup important data
kubectl delete pod test-pod -n default
mkdir -p /backup
cp -r /var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount/* /backup/

# Step 2: Check drive connections (manual step)

# Step 3: Run extended SMART diagnostics
smartctl -t long /dev/sda
sleep 7200  # Wait for test to complete
smartctl -a /dev/sda

# Step 4: If errors persist, replace the drive (manual step)

# Step 5: Restore data to the new drive
cp -r /backup/* /var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount/
kubectl create -f pod-definition.yaml
```

Note: This issue requires manual intervention to check and potentially replace the drive hardware. Please coordinate with the datacenter team.
SKIP_PHASE2: YES
"""
    
    def _generate_generic_fix_plan(self, issue):
        """
        Generate a generic fix plan
        
        Args:
            issue: Issue to generate a fix plan for
            
        Returns:
            str: Generated fix plan
        """
        return f"""# Root Cause Analysis

After executing the Investigation Plan, I have identified the root cause of the volume I/O errors:

## Primary Issue
- **Issue**: {issue['message']}
- **Severity**: {issue['severity']}
- **Entity**: {issue['entity_type']} ({issue['entity_id']})
- **Details**: {issue['details']}

## Evidence
1. Error logs show issues with the system
2. Pod status indicates I/O errors
3. Volume diagnostics confirm the issue

# Fix Plan

The following steps should be taken to resolve the issue:

1. Restart the affected pod
2. Check system logs for further issues
3. Monitor the system for recurrence

## Commands to Execute

```bash
# Step 1: Restart the pod
kubectl delete pod test-pod -n default
kubectl create -f pod-definition.yaml

# Step 2: Check system logs
journalctl -u kubelet --since "1 hour ago" | grep "volume"

# Step 3: Monitor the system
kubectl get events -n default --watch
```
"""

## Mock Analysis Phase

Now, let's create a mock implementation of the Analysis Phase that uses our mock LangGraph components.

In [3]:
class MockAnalysisPhase:
    """
    Mock implementation of Analysis Phase for demonstration
    """
    
    def __init__(self, collected_info, config_data=None):
        """
        Initialize the mock analysis phase
        
        Args:
            collected_info: Pre-collected diagnostic information from Phase0
            config_data: Configuration data (optional)
        """
        self.collected_info = collected_info
        self.config_data = config_data or {}
        print("Initializing Analysis Phase...")
    
    async def run_investigation(self, pod_name, namespace, volume_path, investigation_plan, message_list=None):
        """
        Run the investigation based on the Investigation Plan
        
        Args:
            pod_name: Name of the pod with the error
            namespace: Namespace of the pod
            volume_path: Path of the volume with I/O error
            investigation_plan: Investigation Plan generated by the Plan Phase
            message_list: Optional message list for chat mode
            
        Returns:
            tuple: (Analysis result, Updated message list)
        """
        print(f"\nRunning investigation for pod {namespace}/{pod_name} with volume path {volume_path}")
        print(f"\nInvestigation Plan:\n{investigation_plan}")
        
        # Create LangGraph for investigation
        graph = MockLangGraphStateGraph(self.collected_info, phase="phase1", config_data=self.config_data)
        
        # Create query for LangGraph
        query = {"messages": [{"role": "user", "content": f"Execute the Investigation Plan for pod {pod_name} in namespace {namespace} with volume path {volume_path}"}]}
        
        # Run LangGraph
        print("\nExecuting Investigation Plan with LangGraph...")
        response = await graph.ainvoke(query, config={"recursion_limit": 100})
        
        # Extract result
        final_message = response["messages"][-1]["content"]
        
        # Update message list if provided
        if message_list is not None:
            message_list.append({"role": "assistant", "content": final_message})
        
        return final_message, message_list

## Running Phase1: ReAct Investigation

Now let's run the ReAct Investigation phase with our mock implementation.

In [4]:
async def run_analysis_phase_with_plan(pod_name, namespace, volume_path, collected_info, investigation_plan, config_data=None, message_list=None):
    """
    Run Phase 1: ReAct Investigation with pre-collected information as base knowledge
    
    Args:
        pod_name: Name of the pod with the error
        namespace: Namespace of the pod
        volume_path: Path of the volume with I/O error
        collected_info: Pre-collected diagnostic information from Phase0
        investigation_plan: Investigation Plan generated by the Plan Phase
        config_data: Configuration data (optional)
        message_list: Optional message list for chat mode
        
    Returns:
        tuple: (Analysis result, Skip Phase2 flag, Updated message list)
    """
    print("Starting Phase 1: ReAct Investigation with Plan")
    
    # Initialize the analysis phase
    phase = MockAnalysisPhase(collected_info, config_data)
    
    # Run the investigation
    result, message_list = await phase.run_investigation(pod_name, namespace, volume_path, investigation_plan, message_list)
    
    # Check if the result contains the SKIP_PHASE2 marker
    skip_phase2 = "SKIP_PHASE2: YES" in result
    
    # Remove the SKIP_PHASE2 marker from the output if present
    if skip_phase2:
        result = result.replace("SKIP_PHASE2: YES", "").strip()
        print("\nPhase 1 indicated Phase 2 should be skipped")
    
    return result, skip_phase2, message_list

In [5]:
# Create mock collected info from Phase0
def create_mock_collected_info():
    knowledge_graph = create_mock_knowledge_graph()
    kubernetes_data = get_mock_kubernetes_data()
    system_data = get_mock_system_data()
    
    return {
        "pod_info": kubernetes_data.get("pods", {}),
        "pvc_info": kubernetes_data.get("pvcs", {}),
        "pv_info": kubernetes_data.get("pvs", {}),
        "node_info": kubernetes_data.get("nodes", {}),
        "csi_driver_info": kubernetes_data.get("csi_driver", {}),
        "storage_class_info": kubernetes_data.get("storage_classes", {}),
        "system_info": system_data,
        "knowledge_graph_summary": {
            "pod_count": 1,
            "pvc_count": 1,
            "pv_count": 1,
            "node_count": 1,
            "issue_count": len(knowledge_graph.issues)
        },
        "issues": knowledge_graph.issues,
        "knowledge_graph": knowledge_graph
    }

# Create mock investigation plan from Plan Phase
mock_investigation_plan = """
Investigation Plan:
Target: Pod default/test-pod, Volume Path: /var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount
Generated Steps: 8 steps

Step 1: Get pod details | Tool: kg_get_entity_info(entity_type='Pod', id='gnode:Pod:default/test-pod') | Expected: Pod configuration and status
Step 2: Check related PVC | Tool: kg_find_path(source_entity_type='Pod', source_id='gnode:Pod:default/test-pod', target_entity_type='PVC', target_id='*') | Expected: Path from Pod to PVC
Step 3: Get PVC details | Tool: kg_get_entity_info(entity_type='PVC', id='gnode:PVC:default/test-pvc') | Expected: PVC configuration and status
Step 4: Check related PV | Tool: kg_find_path(source_entity_type='PVC', source_id='gnode:PVC:default/test-pvc', target_entity_type='PV', target_id='*') | Expected: Path from PVC to PV
Step 5: Get PV details | Tool: kg_get_entity_info(entity_type='PV', id='gnode:PV:test-pv') | Expected: PV configuration and status
Step 6: Check node status | Tool: kg_get_entity_info(entity_type='Node', id='gnode:Node:worker-1') | Expected: Node status and conditions
Step 7: Check for issues | Tool: kg_get_all_issues(severity='primary') | Expected: Primary issues in the system
Step 8: Analyze issues | Tool: kg_analyze_issues() | Expected: Root cause analysis and patterns

Fallback Steps (if main steps fail):
Step F1: Print Knowledge Graph | Tool: kg_print_graph(include_details=True, include_issues=True) | Expected: Complete system visualization | Trigger: kg_get_entity_info_failed
Step F2: Check system logs | Tool: kubectl_logs(pod_name='test-pod', namespace='default') | Expected: Pod logs for error messages | Trigger: kg_get_all_issues_failed
"""

# Define the target pod, namespace, and volume path
target_pod = "test-pod"
target_namespace = "default"
target_volume_path = "/var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount"

# Define configuration data
config_data = {
    "troubleshoot": {
        "timeout_seconds": 300,
        "interactive_mode": True
    }
}

# Create mock collected info
collected_info = create_mock_collected_info()

# Run the analysis phase
analysis_result, skip_phase2, _ = await run_analysis_phase_with_plan(
    target_pod, target_namespace, target_volume_path, 
    collected_info, mock_investigation_plan, config_data
)

Starting Phase 1: ReAct Investigation with Plan
Initializing Analysis Phase...

Running investigation for pod default/test-pod with volume path /var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount

Investigation Plan:

Investigation Plan:
Target: Pod default/test-pod, Volume Path: /var/lib/kubelet/pods/pod-123-456/volumes/kubernetes.io~csi/test-pv/mount
Generated Steps: 8 steps

Step 1: Get pod details | Tool: kg_get_entity_info(entity_type='Pod', id='gnode:Pod:default/test-pod') | Expected: Pod configuration and status
Step 2: Check related PVC | Tool: kg_find_path(source_entity_type='Pod', source_id='gnode:Pod:default/test-pod', target_entity_type='PVC', target_id='*') | Expected: Path from Pod to PVC
Step 3: Get PVC details | Tool: kg_get_entity_info(entity_type='PVC', id='gnode:PVC:default/test-pvc') | Expected: PVC configuration and status
Step 4: Check related PV | Tool: kg_find_path(source_entity_type='PVC', source_id='gnode:PVC:default/test-pvc', target_ent

## Examining the Analysis Result

Let's examine the analysis result from Phase1, which includes the root cause analysis and fix plan.

In [6]:
# Display the analysis result
print(analysis_result)

# Root Cause Analysis

After executing the Investigation Plan, I have identified the root cause of the volume I/O errors:

## Primary Issue
- **Issue**: XFS filesystem corruption detected on volume test-pv
- **Severity**: critical
- **Entity**: System (gnode:System:filesystem)
- **Details**: XFS metadata corruption found during filesystem check. This can lead to I/O errors and data loss.

## Evidence
1. XFS filesystem corruption detected in the volume's metadata
2. Kernel logs show XFS_CORRUPT_INODES errors
3. I/O errors reported by the container runtime

## Contributing Factors
- Possible hardware issues with the underlying drive
- Multiple read failures recorded on the drive

# Fix Plan

The following steps should be taken to resolve the issue:

1. Unmount the corrupted filesystem
2. Run xfs_repair with the -L option to fix the filesystem corruption
3. Check the drive health with SMART tools
4. Remount the filesystem
5. Restart the affected pod

## Commands to Execute

```bash
# Step

## Skip Phase2 Flag

The Skip Phase2 flag indicates whether Phase2 (Remediation) should be skipped. This flag is set to true if:

1. No issues are detected in the system
2. The issues require manual intervention and cannot be fixed automatically

In [7]:
# Check the Skip Phase2 flag
print(f"Skip Phase2: {skip_phase2}")

Skip Phase2: False


## LangGraph Workflow

Phase1 uses a LangGraph StateGraph to manage the flow of the investigation process. The graph consists of three main nodes:

1. **call_model**: LLM reasoning node that decides what to do next
2. **tools_condition**: Condition node that checks if a tool was requested
3. **serial_tools**: Tool execution node that runs the requested tool

The flow is as follows:

1. The LLM (**call_model**) analyzes the current state and decides what to do
2. If a tool is requested, the **tools_condition** routes to **serial_tools**
3. The **serial_tools** node executes the requested tool and returns the result
4. The result is fed back to the LLM (**call_model**) for further analysis
5. This loop continues until the LLM completes its analysis and produces a Fix Plan.