Run this notebook locally, with a .env containing your google API key GOOGLE_API_KEY = your key

Uncomment and run the cell below to install the dependencies

In [154]:
# pip install -r requirements.txt

In [None]:
# Imports
import os
from dotenv import load_dotenv
import psutil
import platform
from datetime import datetime
import logging, time, asyncio, json
import subprocess
from typing import Dict, Any
import shutil
import re

from google.adk.agents import Agent
from google.adk.models.google_llm import Gemini
from google.adk.runners import InMemoryRunner
from google.adk.tools import google_search
from google.adk.sessions import InMemorySessionService
from google.genai import types

In [156]:
# Load variables from .env file
load_dotenv()

# Access the key
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
print("‚úÖ Gemini API key loaded:", bool(GOOGLE_API_KEY))

# Setup logging
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("OptimAIzer")

‚úÖ Gemini API key loaded: True


In [157]:
# Configure Retry Options

retry_config=types.HttpRetryOptions(
    attempts=5,  # Maximum retry attempts
    exp_base=7,  # Delay multiplier
    initial_delay=1, # Initial delay before first retry (in seconds)
    http_status_codes=[429, 500, 503, 504] # Retry on these HTTP errors
)

## System Monitoring Tools (Custom Tools)

In [158]:
def get_cpu_usage() -> dict:
    """Get CPU usage per core and overall. Efficiently computes overall as average."""
    percents = psutil.cpu_percent(interval=1, percpu=True)
    data = {
        "overall_usage_percent": round(sum(percents) / len(percents), 1),
        "per_core_usage": percents,
        "timestamp": datetime.now().isoformat()
    }
    return data


def get_memory_usage() -> dict:
    """Get memory usage stats."""
    mem = psutil.virtual_memory()
    return {
        "total_gb": round(mem.total / (1024**3), 2),
        "used_gb": round(mem.used / (1024**3), 2),
        "usage_percent": mem.percent,
        "timestamp": datetime.now().isoformat()
    }


def get_disk_usage() -> dict:
    """Get disk usage per partition."""
    partitions = []
    for partition in psutil.disk_partitions():
        try:
            usage = psutil.disk_usage(partition.mountpoint)
            partitions.append({
                "device": partition.device,
                "total_gb": round(usage.total / (1024**3), 2),
                "free_gb": round(usage.free / (1024**3), 2),
                "usage_percent": usage.percent
            })
        except Exception:
            continue
    return {"partitions": partitions, "timestamp": datetime.now().isoformat()}


def get_top_processes() -> dict:
    """Get top 5 processes by CPU and Memory usage."""
    processes = []
    for proc in psutil.process_iter(['pid','name','cpu_percent','memory_percent']):
        try:
            if proc.info['name'].lower() == "system idle process":
                continue  # skip idle process
            processes.append(proc.info)
        except Exception:
            continue
    
    top_cpu = sorted(processes, key=lambda p: p['cpu_percent'] or 0, reverse=True)[:5]
    top_mem = sorted(processes, key=lambda p: p['memory_percent'] or 0, reverse=True)[:5]
    return {"top_cpu": top_cpu, "top_memory": top_mem, "timestamp": datetime.now().isoformat()}


def get_gpu_usage() -> dict:
    """Query GPU via nvidia-smi if available. Fallback gracefully if not."""
    try:
        out = subprocess.check_output(
            ["nvidia-smi", "--query-gpu=utilization.gpu,memory.used,memory.total",
             "--format=csv,noheader,nounits"],
            stderr=subprocess.DEVNULL
        ).decode().strip()
        gpus = []
        if out:
            for line in out.splitlines():
                util, mem_used, mem_total = [x.strip() for x in line.split(",")]
                gpus.append({
                    "utilization_percent": float(util),
                    "memory_used_mb": float(mem_used),
                    "memory_total_mb": float(mem_total)
                })
        return {"gpus": gpus, "timestamp": datetime.now().isoformat()}
    except Exception:
        return {"gpus": [], "note": "nvidia-smi not available", "timestamp": datetime.now().isoformat()}


def get_network_usage() -> dict:
    """Return network I/O counters."""
    try:
        net = psutil.net_io_counters()
        return {
            "bytes_sent": net.bytes_sent,
            "bytes_recv": net.bytes_recv,
            "packets_sent": net.packets_sent,
            "packets_recv": net.packets_recv,
            "timestamp": datetime.now().isoformat()
        }
    except Exception as e:
        return {"error": str(e), "timestamp": datetime.now().isoformat()}


def get_battery() -> dict:
    """Return battery status if available."""
    try:
        batt = psutil.sensors_battery()
        if batt is None:
            return {"available": False, "timestamp": datetime.now().isoformat()}
        return {
            "available": True,
            "percent": batt.percent,
            "plugged_in": bool(batt.power_plugged),
            "timestamp": datetime.now().isoformat()
        }
    except Exception:
        return {"available": False, "timestamp": datetime.now().isoformat()}


def get_temperatures() -> dict:
    """Return temperature sensors as floats."""
    try:
        temps = psutil.sensors_temperatures()
        temps_clean = {}
        for k, v in temps.items():
            temps_clean[k] = [round(float(t.current), 1) for t in v if hasattr(t, "current")]
        return {"temperatures": temps_clean, "timestamp": datetime.now().isoformat()}
    except Exception:
        return {"temperatures": {}, "timestamp": datetime.now().isoformat()}


def get_system_info() -> str:
    """Combine all monitoring tools into one JSON string for agents or reporting."""
    try:
        info = {
            "platform": platform.system(),
            "processor": platform.processor(),
            "ram_gb": round(psutil.virtual_memory().total / (1024.0 **3), 2),
            "cpu_cores": psutil.cpu_count(logical=False),
            "cpu": get_cpu_usage(),
            "memory": get_memory_usage(),
            "disk": get_disk_usage(),
            "top_processes": get_top_processes(),
            "gpu": get_gpu_usage(),
            "network": get_network_usage(),
            "battery": get_battery(),
            "temperatures": get_temperatures(),
            "timestamp": datetime.now().isoformat()
        }
        return json.dumps(info, indent=2)
    except Exception as e:
        return json.dumps({"error": str(e), "timestamp": datetime.now().isoformat()}, indent=2)


print("‚úÖ Monitoring tools ready")

‚úÖ Monitoring tools ready


## 2. Auto-Fix Tools (Actions)

In [159]:
# Global OS type (to avoid repeated calls)
OS_TYPE = platform.system()

# CROSS-PLATFORM AUTO-FIX TOOLS

def restart_search_service() -> dict:
    """Restart system search/indexing service cross-platform."""
    try:
        if OS_TYPE == "Windows":
            subprocess.run(['net', 'stop', 'WSearch'], capture_output=True, shell=True)
            result = subprocess.run(['net', 'start', 'WSearch'], capture_output=True, shell=True)
            return {
                "status": "SUCCESS" if result.returncode == 0 else "FAILED",
                "message": "Windows Search restarted" if result.returncode == 0 else "Requires admin rights",
                "timestamp": datetime.now().isoformat()
            }

        elif OS_TYPE == "Darwin": # macOS
            subprocess.run(['launchctl', 'kickstart', '-k', 'system/com.apple.metadata.mds'], capture_output=True)
            return {"status": "SUCCESS", "message": "macOS Spotlight indexing restarted", "timestamp": datetime.now().isoformat()}

        elif OS_TYPE == "Linux":
            # tracker3 is common in GNOME; other systems may differ
            result = subprocess.run(['systemctl', 'restart', 'tracker-miner-fs-3'], capture_output=True)
            if result.returncode == 0:
                return {"status": "SUCCESS", "message": "Linux tracker indexer restarted", "timestamp": datetime.now().isoformat()}
            else:
                return {"status": "UNAVAILABLE", "message": "Linux indexer not available on this system", "timestamp": datetime.now().isoformat()}

        else:
            return {"status": "UNSUPPORTED", "message": f"Unsupported OS: {OS_TYPE}", "timestamp": datetime.now().isoformat()}

    except Exception as e:
        return {"status": "ERROR", "message": str(e), "timestamp": datetime.now().isoformat()}


def clear_temp_files() -> dict:
    """Clear temporary files in a cross-platform way."""
    try:
        temp_paths = []
        if OS_TYPE == "Windows":
            temp_paths = [os.environ.get("TEMP"), os.environ.get("TMP")]
        else:
            temp_paths = ["/tmp", "/var/tmp"]

        total_freed = 0
        for path in temp_paths:
            if path and os.path.exists(path):
                for filename in os.listdir(path):
                    try:
                        fpath = os.path.join(path, filename)
                        if os.path.isfile(fpath):
                            total_freed += os.path.getsize(fpath)
                            os.unlink(fpath)
                    except Exception:
                        continue  # skip files we cannot delete

        return {"status": "SUCCESS", "freed_mb": round(total_freed / (1024*1024), 2), "timestamp": datetime.now().isoformat()}

    except Exception as e:
        return {"status": "ERROR", "message": str(e), "timestamp": datetime.now().isoformat()}


def optimize_memory() -> dict:
    """Cross-platform memory optimization (safe)."""
    try:
        if OS_TYPE == "Windows":
            subprocess.run(['rundll32.exe', 'advapi32.dll,ProcessIdleTasks'], shell=True, capture_output=True)
            return {"status": "SUCCESS", "message": "Windows memory optimization triggered", "timestamp": datetime.now().isoformat()}

        elif OS_TYPE == "Darwin":
            result = subprocess.run(['sudo', 'purge'], capture_output=True)
            return {
                "status": "SUCCESS" if result.returncode == 0 else "FAILED",
                "message": "macOS purge executed" if result.returncode == 0 else "macOS purge requires admin rights",
                "timestamp": datetime.now().isoformat()
            }

        elif OS_TYPE == "Linux":
            # Safe suggestion instead of auto-executing
            return {
                "status": "TIP",
                "message": "Linux memory cleanup tip: sudo sh -c 'sync; echo 3 > /proc/sys/vm/drop_caches'",
                "note": "Not executed automatically for safety",
                "timestamp": datetime.now().isoformat()
            }

        else:
            return {"status": "UNSUPPORTED", "message": f"Unsupported OS: {OS_TYPE}", "timestamp": datetime.now().isoformat()}

    except Exception as e:
        return {"status": "ERROR", "message": str(e), "timestamp": datetime.now().isoformat()}


def analyze_process(process_name: str) -> dict:
    """Return detailed info for all processes matching name."""
    try:
        matches = []
        for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
            if proc.info['name'] and proc.info['name'].lower() == process_name.lower():
                matches.append(proc.info)

        if matches:
            return {"status": "FOUND", "processes": matches, "timestamp": datetime.now().isoformat()}
        else:
            return {"status": "NOT_FOUND", "message": f"Process '{process_name}' not found", "timestamp": datetime.now().isoformat()}

    except Exception as e:
        return {"status": "ERROR", "message": str(e), "timestamp": datetime.now().isoformat()}


print("‚úÖ Cross-platform auto-fix tools loaded successfully!")

‚úÖ Cross-platform auto-fix tools loaded successfully!


## 3. Agent Creation (with the custom tools)

In [160]:
# Shared model
gemini_model = Gemini(model="gemini-2.5-flash-lite", api_key=GOOGLE_API_KEY, retry_options=retry_config)

# Monitor Agent
monitor_agent = Agent(
    model=gemini_model,
    name="monitor",
    instruction="""You are a system monitor. Call get_system_info() which returns everything:
        - CPU usage
        - Memory usage  
        - Disk usage
        - Top processes
        - GPU
        - Network stats
        - Battery
        - Temperatures
        Summarize the results in plain English. Flag any issues:
        - CPU > 80%
        - Memory > 85%
        - Disk > 90%
        - High resource processes""",
    tools=[get_system_info],
)

# Analyzer Agent
analyzer_agent = Agent(
    model=gemini_model,
    name="analyzer",
    instruction="""You are a system performance analyzer:
            Analyze system data. Identify root causes, assess severity (Critical/High/Medium/Low), and explain impact (You don't ask questions or wait for responses). 
            Focus on CPU, memory, disk, and process issues.""",
    tools=[analyze_process],
)

# Dynamic Auto-Fix Agent
autofix_agent = Agent(
    model=gemini_model,
    name="autofix",
    instruction="""You are a dynamic auto-fix agent. For ANY issue on this system:
        1. Use google_search to research unfamiliar problems
        2. Apply automated fixes using available tools
        3. Provide manual steps when automation isn't possible
        4. Be adaptive - research what you don't know!
    Available fixes: restart_search_service, clear_temp_files, optimize_memory
    Safety: Never execute destructive commands. Always explain what each fix does.""",
    tools=[restart_search_service, clear_temp_files, optimize_memory, analyze_process, google_search],
)

print("‚úÖ All agents created")

‚úÖ All agents created


In [161]:
# Memory System
class SystemMemory:
    """Track scan history and recurring issues, with automatic pruning and comparison."""

    def __init__(self, max_history=30):
        self.history = []
        self.max_history = max_history  # max number of scans to keep

    def record_scan(self, scan_data):
        """Record a new scan and prune old scans if necessary."""
        self.history.append({
            "timestamp": datetime.now().isoformat(),
            "data": scan_data
        })
        # prune old scans
        if len(self.history) > self.max_history:
            self.history = self.history[-self.max_history:]

    def get_recent_scans(self, limit=5):
        """Return the most recent N scans."""
        return self.history[-limit:]

    def compare_last_two_scans(self):
        """Optional: compare last two scans and return changes (CPU, memory, disk, etc.)"""
        if len(self.history) < 2:
            return "Not enough scans to compare."
        old = self.history[-2]['data']
        new = self.history[-1]['data']
        # very simple example: CPU/memory usage comparison
        comparison = {
            "cpu_change": new.get("cpu", {}).get("overall_usage_percent", 0) - old.get("cpu", {}).get("overall_usage_percent", 0),
            "memory_change": new.get("memory", {}).get("usage_percent", 0) - old.get("memory", {}).get("usage_percent", 0),
            "disk_change": max(
                [p.get("usage_percent", 0) for p in new.get("disk", {}).get("partitions", [])], default=0
            ) - max(
                [p.get("usage_percent", 0) for p in old.get("disk", {}).get("partitions", [])], default=0
            )
        }
        return comparison

memory = SystemMemory()
print("‚úÖ Memory system ready")

‚úÖ Memory system ready


In [162]:
# Main Orchestrator with Smart Summarization
class OptimAIzer:
    """Main orchestrator - coordinates all agents"""
    
    def __init__(self, monitor, analyzer, autofix, memory):
        self.monitor = monitor
        self.analyzer = analyzer
        self.autofix = autofix
        self.memory = memory
    
    def _extract_text(self, responses):
        """Extract text from agent responses"""
        text = ""
        for event in responses:
            if hasattr(event, 'content') and event.content:
                if hasattr(event.content, 'parts') and event.content.parts:
                    for part in event.content.parts:
                        if hasattr(part, 'text') and part.text:
                            text = part.text
        return text
    
    def _summarize_monitor_data(self, monitor_text):
        """Extract key metrics from monitor output. 
        Accepts either JSON string (preferred) or free text fallback.
        Returns dict with cpu_usage, memory_usage, and more (as floats) when possible.
        """
        summary = {"raw_data": monitor_text}
        # Try JSON parse first (monitor_tool returns JSON)
        try:
            parsed = json.loads(monitor_text)
            # drill into nested structure if present
            cpu = parsed.get("cpu", {})
            memory = parsed.get("memory", {})
            disk = parsed.get("disk", {})
            gpu = parsed.get("gpu", {})
            network = parsed.get("network", {})
            battery = parsed.get("battery", {})
            temperatures = parsed.get("temperatures", {})
            
            # cpu percent may be nested
            cpu_percent = None
            if isinstance(cpu, dict):
                cpu_percent = cpu.get("cpu_percent") or cpu.get("overall_usage_percent")
            mem_percent = None
            if isinstance(memory, dict):
                mem_percent = memory.get("usage_percent") or memory.get("percent")
            gpu_percent = None
            if isinstance(gpu, dict):
                gpu_percent = gpu.get("gpu_percent") or gpu.get("overall_gpu_percent")
            summary.update({
                "cpu_usage": float(cpu_percent) if cpu_percent not in (None, "") else None,
                "memory_usage": float(mem_percent) if mem_percent not in (None, "") else None,
                "disk": disk,
                "gpu": float(gpu_percent) if gpu_percent not in (None, "") else None,
                "network": network,
                "battery": battery,
                "temperatures": temperatures
            })
            return summary
        
        except Exception:
            # fallback: regex parse the original simple text
            try:
                import re
                cpu_match = re.search(r'(\d+\.?\d*)%?\s*(?:CPU|cpu)', monitor_text)
                if cpu_match:
                    summary['cpu_usage'] = float(cpu_match.group(1))
                mem_match = re.search(r'(\d+\.?\d*)%?\s*(?:RAM|Memory|memory)', monitor_text)
                if mem_match:
                    summary['memory_usage'] = float(mem_match.group(1))
            except Exception:
                pass
            return summary

    
    async def run(self, apply_fixes=False):
        """Run complete scan with optional auto-fix"""
        logger.info("Starting scan...")
        
        # Step 1: Monitor
        logger.info("Step 1: Monitoring system...")
        monitor_runner = InMemoryRunner(agent=self.monitor)
        monitor_resp = await monitor_runner.run_debug("Get complete system status with top 5 processes by CPU and memory")
        monitor_text = self._extract_text(monitor_resp)
        
        # Step 2: Analyze (pass full data to analyzer)
        logger.info("Step 2: Analyzing issues...")
        analyzer_runner = InMemoryRunner(agent=self.analyzer)
        
        # Create a structured prompt instead of dumping raw text
        analysis_prompt = f"""Analyze this system:

{monitor_text}

Identify:
1. Critical issues (anything above threshold)
2. Root causes
3. Severity levels
4. Which processes are responsible"""
        
        analyzer_resp = await analyzer_runner.run_debug(analysis_prompt)
        analysis_text = self._extract_text(analyzer_resp)
        
        # Step 3: Auto-Fix (if enabled)
        fix_text = "Auto-fix disabled (run with apply_fixes=True to enable)"
        if apply_fixes:
            logger.info("Step 3: Applying fixes...")
            autofix_runner = InMemoryRunner(agent=self.autofix)
            
            fix_prompt = f"""Based on this analysis, apply fixes:

{analysis_text}

Use your tools to:
1. Research any unfamiliar issues
2. Apply automated fixes where safe
3. Provide manual instructions for complex issues"""
            
            fix_resp = await autofix_runner.run_debug(fix_prompt)
            fix_text = self._extract_text(fix_resp)
        
        result = {
            "timestamp": datetime.now().isoformat(),
            "monitor": monitor_text,
            "analysis": analysis_text,
            "fixes": fix_text,
            "fixes_applied": apply_fixes
        }
        
        self.memory.record_scan(result)
        logger.info("Scan complete!")
        return result

# Recreate OptimAIzer
guard = OptimAIzer(monitor_agent, analyzer_agent, autofix_agent, memory)
print("‚úÖ OptimAIzer ready!")

‚úÖ OptimAIzer ready!


In [163]:
# # Universal Report Generator - Auto-Detects
# def generate_user_report(result: Dict[str, Any]) -> str:
#     """Generate a clean, readable report that automatically understands any app"""
    
#     report = []
#     report.append("‚ïî" + "="*78 + "‚ïó")
#     report.append("‚ïë" + " "*20 + "OPTIMAIZER HEALTH REPORT" + " "*33 + "‚ïë")
#     report.append("‚ïö" + "="*78 + "‚ïù\n")
    
#     monitor = result['monitor']
#     analysis = result['analysis']
    
#     # Parse key metrics
#     import re
#     mem_match = re.search(r'Memory usage.*?(\d+\.?\d*)%', monitor, re.IGNORECASE)
#     cpu_match = re.search(r'CPU usage.*?(\d+\.?\d*)%', monitor, re.IGNORECASE)
    
#     memory_pct = float(mem_match.group(1)) if mem_match else 0
#     cpu_pct = float(cpu_match.group(1)) if cpu_match else 0
    
#     # Health Status
#     report.append("üè• OVERALL HEALTH STATUS")
#     report.append("-" * 80)
    
#     issues_count = 0
#     if memory_pct >= 84:
#         issues_count += 1
#     if cpu_pct > 80:
#         issues_count += 1
    
#     if issues_count == 0:
#         report.append("‚úÖ HEALTHY - Your system is running well!")
#     elif issues_count == 1:
#         report.append("‚ö†Ô∏è  NEEDS ATTENTION - Some optimization recommended")
#     else:
#         report.append("üî¥ CRITICAL - Immediate action needed")
    
#     report.append("")
    
#     # Quick Stats
#     report.append("üìä QUICK STATS")
#     report.append("-" * 80)
    
#     mem_status = "üî¥ CRITICAL" if memory_pct >= 85 else "‚ö†Ô∏è  HIGH" if memory_pct >= 80 else "‚úÖ Normal"
#     report.append(f"üíæ Memory Usage: {memory_pct:.1f}% {mem_status}")
#     report.append(f"‚ö° CPU Usage: {cpu_pct:.1f}% {'‚ö†Ô∏è HIGH' if cpu_pct > 80 else '‚úÖ Normal'}")
    
#     # GPU, Battery (if available)
#     gpu_match = re.search(r'GPU utilization.*?(\d+)%', monitor, re.IGNORECASE)
#     if gpu_match:
#         gpu_pct = int(gpu_match.group(1))
#         report.append(f"üéÆ GPU Usage: {gpu_pct}% {'‚ö†Ô∏è HIGH' if gpu_pct > 80 else '‚úÖ Normal'}")
    
#     battery_match = re.search(r'battery.*?(\d+)%', monitor, re.IGNORECASE)
#     if battery_match:
#         battery_pct = int(battery_match.group(1))
#         report.append(f"üîã Battery: {battery_pct}%")
    
#     report.append("")
    
#     # ============================================================================
#     # SMART RESOURCE ANALYSIS - Automatically groups similar processes
#     # ============================================================================
#     report.append("üî• TOP RESOURCE USERS")
#     report.append("-" * 80)
    
#     # Extract process info with percentages
#     # Pattern: process.exe (PID 123): XX.X% Memory or XX.X% CPU
#     process_pattern = r'([A-Za-z0-9_\-.]+\.exe)\s*\(PID\s+\d+\)[:\s]+(\d+\.?\d*)%\s+(Memory|CPU)'
#     matches = re.findall(process_pattern, monitor, re.IGNORECASE)
    
#     # Group by process name and sum memory
#     process_memory = {}
#     for proc_name, percentage, metric_type in matches:
#         if metric_type.lower() == 'memory':
#             base_name = proc_name.lower().replace('.exe', '')
#             pct = float(percentage)
            
#             if base_name in process_memory:
#                 process_memory[base_name]['total'] += pct
#                 process_memory[base_name]['count'] += 1
#             else:
#                 process_memory[base_name] = {
#                     'name': proc_name.replace('.exe', ''),
#                     'total': pct,
#                     'count': 1
#                 }
    
#     # Sort by total memory
#     top_apps = sorted(process_memory.items(), key=lambda x: x[1]['total'], reverse=True)[:7]
    
#     # Display with smart formatting
#     for app_key, app_data in top_apps:
#         # Skip system idle process
#         if 'idle' in app_key.lower():
#             continue
        
#         # Smart display name
#         display_name = app_data['name'].replace('.exe', '')
        
#         if app_data['count'] > 1:
#             report.append(f"‚Ä¢ {display_name}: {app_data['total']:.1f}% RAM ({app_data['count']} instances)")
#         else:
#             report.append(f"‚Ä¢ {display_name}: {app_data['total']:.1f}% RAM")
    
#     report.append("")
    
#     # ============================================================================
#     # SMART RECOMMENDATIONS - Based on actual data patterns
#     # ============================================================================
#     report.append("üí° WHAT YOU SHOULD DO")
#     report.append("-" * 80)
    
#     solutions = []
    
#     # High memory - recommend closing top apps
#     if memory_pct >= 84:
#         top_culprits = [app[1]['name'].replace('.exe', '') for app in top_apps[:3] if 'idle' not in app[0].lower()]
        
#         solutions.append({
#             "priority": "HIGH" if memory_pct >= 85 else "MEDIUM",
#             "issue": "High Memory Usage",
#             "impact": "System may slow down, freeze, or crash applications",
#             "fix": f"Close unused applications. Top users: {', '.join(top_culprits)}"
#         })
    
#     # Find processes with multiple instances
#     for app_key, app_data in top_apps:
#         if app_data['count'] > 3:  # More than 3 instances
#             solutions.append({
#                 "priority": "LOW",
#                 "issue": f"Multiple {app_data['name'].replace('.exe', '')} Instances",
#                 "impact": f"Using {app_data['total']:.1f}% RAM total",
#                 "fix": f"Close {app_data['name'].replace('.exe', '')} windows you're not using"
#             })
    
#     # Find single processes using lots of memory
#     for app_key, app_data in top_apps:
#         if app_data['count'] == 1 and app_data['total'] > 8:  # Single process using >8% RAM
#             solutions.append({
#                 "priority": "MEDIUM",
#                 "issue": f"{app_data['name'].replace('.exe', '')} Using High Memory",
#                 "impact": f"Single process using {app_data['total']:.1f}% of RAM",
#                 "fix": f"Restart {app_data['name'].replace('.exe', '')} to free memory"
#             })
    
#     # Memory compression detection
#     if any('memcompression' in app[0].lower() for app in top_apps):
#         solutions.append({
#             "priority": "INFO",
#             "issue": "Memory Compression Active",
#             "impact": "Windows is compressing RAM (sign of memory pressure)",
#             "fix": "Close applications to reduce memory usage"
#         })
    
#     # High CPU
#     if cpu_pct > 80:
#         cpu_pattern = r'([A-Za-z0-9_\-.]+\.exe)[^0-9]*(\d+\.?\d*)%\s*(?:CPU)'
#         cpu_processes = re.findall(cpu_pattern, monitor, re.IGNORECASE)
        
#         if cpu_processes:
#             # Find highest CPU user
#             top_cpu = max(cpu_processes, key=lambda x: float(x[1]))
#             if float(top_cpu[1]) > 20:  # Using more than 20% CPU
#                 solutions.append({
#                     "priority": "HIGH",
#                     "issue": f"{top_cpu[0].replace('.exe', '')} Using High CPU",
#                     "impact": "Causes slowdowns, fan noise, battery drain",
#                     "fix": f"Check why {top_cpu[0].replace('.exe', '')} is using {top_cpu[1]}% CPU - may need to restart it"
#                 })
    
#     # Display solutions
#     if not solutions:
#         report.append("‚úÖ No immediate action needed - system is healthy!")
#     else:
#         priority_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2, "INFO": 3}
#         solutions.sort(key=lambda x: priority_order.get(x['priority'], 99))
        
#         priority_emoji = {"HIGH": "üî¥", "MEDIUM": "‚ö†Ô∏è", "LOW": "üí°", "INFO": "‚ÑπÔ∏è "}
        
#         for i, sol in enumerate(solutions, 1):
#             emoji = priority_emoji.get(sol['priority'], "‚Ä¢")
#             report.append(f"\n{emoji} {i}. {sol['issue']}")
#             report.append(f"   Why it matters: {sol['impact']}")
#             report.append(f"   Quick fix: {sol['fix']}")
    
#     report.append("")
    
#     # Auto-fix option
#     if issues_count > 0:
#         report.append("-" * 80)
#         if result.get('fixes_applied'):
#             report.append("‚úÖ AUTO-FIX APPLIED")
#         else:
#             report.append("üîß Run: await interactive_guard.run_interactive()")
    
#     report.append("")
#     report.append("‚ïö" + "="*78 + "‚ïù")
    
#     return "\n".join(report)

# print("‚úÖ Smart universal report generator ready")

In [None]:
def generate_user_report(result: Dict[str, Any]) -> str:
    """system health report generator"""

    monitor = result.get("monitor", "") or ""

    report = []
    report.append("‚ïî" + "=" * 78 + "‚ïó")
    report.append("‚ïë" + " " * 20 + "OPTIMAIZER HEALTH REPORT" + " " * 33 + "‚ïë")
    report.append("‚ïö" + "=" * 78 + "‚ïù\n")

    # ------------------------------------------------
    # Flexible metric extraction (multi-pattern)
    # ------------------------------------------------

    def find_first(patterns):
        for pat in patterns:
            m = re.search(pat, monitor, re.IGNORECASE)
            if m:
                return m.group(1)
        return None

    mem_raw = find_first([
        r"memory\s*usage[^%]*?(\d+\.?\d*)%",
        r"total\s*memory[^%]*?(\d+\.?\d*)%",
        r"ram\s*usage[^%]*?(\d+\.?\d*)%",
    ])

    cpu_raw = find_first([
        r"cpu\s*usage[^%]*?(\d+\.?\d*)%",
        r"total\s*cpu[^%]*?(\d+\.?\d*)%",
    ])

    gpu_raw = find_first([
        r"gpu\s*(?:load|utilization|usage)[^%]*?(\d+\.?\d*)%",
        r"video\s*engine[^%]*?(\d+\.?\d*)%",
        r"graphics[^%]*?(\d+\.?\d*)%",
    ])

    batt_raw = find_first([
        r"battery\s*(?:level|charge|capacity)?[^%]*?(\d+)\s*%",
        r"battery[^0-9%]*(\d+)\s*percent",
    ])

    batt_state = find_first([
        r"(plugged\s*in|charging|discharging|on\s*battery|ac\s*power|not\s*charging)"
    ]) or "Unknown"

    mem_pct = float(mem_raw) if mem_raw else None
    cpu_pct = float(cpu_raw) if cpu_raw else None
    gpu_pct = float(gpu_raw) if gpu_raw else None
    batt_pct = int(batt_raw) if batt_raw else None

    # -----------------
    # Health Assessment
    # -----------------

    report.append("üè• OVERALL HEALTH STATUS")
    report.append("-" * 80)

    if mem_pct is None or cpu_pct is None:
        report.append("‚ö†Ô∏è PARTIAL DATA - Metrics unavailable")
    elif mem_pct < 80 and cpu_pct < 80:
        report.append("‚úÖ HEALTHY - Your system is running well!")
    elif mem_pct >= 85 or cpu_pct >= 90:
        report.append("üî¥ CRITICAL - Immediate attention needed")
    else:
        report.append("‚ö†Ô∏è NEEDS ATTENTION - Monitor system performance")

    report.append("")

    # ----------- QUICK STATS -----------

    report.append("üìä QUICK STATS")
    report.append("-" * 80)
    report.append(f"üíæ Memory: {mem_pct:.1f}%" if mem_pct is not None else "üíæ Memory: Unavailable")
    report.append(f"‚ö° CPU: {cpu_pct:.1f}%" if cpu_pct is not None else "‚ö° CPU: Unavailable")
    report.append(f"üéÆ GPU: {gpu_pct:.1f}%" if gpu_pct is not None else "üéÆ GPU: Unavailable")

    if batt_pct is not None:
        report.append(f"üîã Battery: {batt_pct}% ({batt_state})")
    else:
        report.append("üîã Battery: Unavailable")

    report.append("")

    # ----------------------------
    # PROCESS PARSING
    # ----------------------------

    # Capture exe + percent pairs without double counting.
    proc_patterns = [
        r'([A-Za-z0-9_\-.]+\.exe).*?(\d+\.?\d*)%\s*CPU',
        r'([A-Za-z0-9_\-.]+\.exe).*?(\d+\.?\d*)%\s*memory',
    ]

    mem_dict = {}
    cpu_dict = {}

    for pat in proc_patterns:
        matches = re.findall(pat, monitor, re.IGNORECASE)

        for name, pct in matches:
            key = name.lower().replace(".exe", "")
            val = float(pct)

            if "cpu" in pat.lower():
                # keep max so CPU total per app never exceeds 100%
                cpu_dict[key] = max(cpu_dict.get(key, 0), val)
            else:
                mem_dict[key] = max(mem_dict.get(key, 0), val)

    top_mem = sorted(mem_dict.items(), key=lambda x: x[1], reverse=True)[:5]
    top_cpu = sorted(cpu_dict.items(), key=lambda x: x[1], reverse=True)[:5]

    # -----------------
    # Process Reporting
    # -----------------

    report.append("üî• TOP PROCESSES")
    report.append("-" * 80)

    if not top_mem and not top_cpu:
        report.append("‚Ä¢ No individual processes exceeded reporting thresholds.")
    else:
        for n, p in top_mem:
            report.append(f"üíæ {n}: {p:.1f}% RAM")

        for n, p in top_cpu:
            report.append(f"‚ö° {n}: {p:.1f}% CPU")

    report.append("")

    # -----------------
    # Recommendations
    # -----------------

    report.append("üí° RECOMMENDATIONS")
    report.append("-" * 80)

    suggested = False

    if mem_pct and mem_pct >= 80 and top_mem:
        names = ", ".join(n for n, _ in top_mem[:3])
        report.append(f"‚Ä¢ Close or restart memory-heavy apps: {names}")
        suggested = True

    if cpu_pct and cpu_pct >= 80 and top_cpu:
        names = ", ".join(n for n, _ in top_cpu[:3])
        report.append(f"‚Ä¢ Investigate high CPU usage from: {names}")
        suggested = True

    if not suggested:
        report.append("‚Ä¢ No immediate action required.")

    report.append("")
    report.append("‚ïö" + "=" * 78 + "‚ïù")

    return "\n".join(report)


In [165]:
# Run and Display User-Friendly Report
print("Running system check...\n")

result = await guard.run(apply_fixes=False)

# Generate and display clean report
user_report = generate_user_report(result)
print(user_report)

# Optional: Show technical details
print("\n\n" + "="*80)
print("üìã TECHNICAL DETAILS")
print("="*80)
print("\nüîç RAW ANALYSIS:")
print(result['analysis'])

2025-11-29 20:40:25,529 - INFO - Starting scan...
2025-11-29 20:40:25,529 - INFO - Step 1: Monitoring system...


Running system check...


 ### Created new session: debug_session_id

User > Get complete system status with top 5 processes by CPU and memory


2025-11-29 20:40:26,010 - INFO - Sending out request, model: gemini-2.5-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False
2025-11-29 20:40:26,689 - INFO - Response received from the model.
2025-11-29 20:40:28,619 - INFO - Sending out request, model: gemini-2.5-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False
2025-11-29 20:40:31,296 - INFO - Response received from the model.
2025-11-29 20:40:31,303 - INFO - Step 2: Analyzing issues...
2025-11-29 20:40:31,309 - INFO - Sending out request, model: gemini-2.5-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False


monitor > The system status is as follows:

**CPU Usage:** 11.5%
**Memory Usage:** 90.6% - This is HIGH and exceeds the 85% threshold.
**Disk Usage:** 58.9% on C:

**Top Processes by CPU:**
* Code.exe (PID 31516): 24.6%
* System (PID 4): 12.8%
* Code.exe (PID 11960): 12.6%
* dwm.exe (PID 9852): 12.3%
* Copilot.exe (PID 26948): 11.7%

**Top Processes by Memory:**
* chrome.exe (PID 1536): 10.65%
* chrome.exe (PID 53188): 6.32%
* Code.exe (PID 31516): 4.67%
* Code.exe (PID 29796): 4.12%
* Code.exe (PID 35104): 3.33%

**GPU Utilization:** 4.0% with 542 MB used out of 8188 MB.

**Network:**
* Bytes Sent: 493,205,525
* Bytes Received: 8,718,147,220

**Battery:** 93% charged and plugged in.

**Temperatures:** No temperature data available.

**Alerts:**
* Memory usage is critically high at 90.6%. Please consider closing some applications to free up memory.

 ### Created new session: debug_session_id

User > Analyze this system:

The system status is as follows:

**CPU Usage:** 11.5%
**Memory U

2025-11-29 20:40:33,566 - INFO - Response received from the model.
2025-11-29 20:40:33,569 - INFO - Scan complete!


analyzer > ### System Performance Analysis

**1. Critical Issues:**

*   **Memory Usage:** Critically high at 90.6%, exceeding the 85% threshold.

**2. Root Causes:**

*   The primary root cause for the critical memory issue appears to be the `chrome.exe` processes consuming a significant portion of the available memory (10.65% and 6.32% for the top two instances).
*   Multiple instances of `Code.exe` are also contributing to memory consumption (4.67%, 4.12%, 3.33%).

**3. Severity Levels:**

*   **Memory Usage:** Critical
*   **CPU Usage:** Medium (No specific threshold mentioned, but several processes are using a notable amount)

**4. Processes Responsible:**

*   **Memory:**
    *   `chrome.exe` (PID 1536): 10.65%
    *   `chrome.exe` (PID 53188): 6.32%
    *   `Code.exe` (PID 31516): 4.67%
    *   `Code.exe` (PID 29796): 4.12%
    *   `Code.exe` (PID 35104): 3.33%
*   **CPU:**
    *   `Code.exe` (PID 31516): 24.6%
    *   `System` (PID 4): 12.8%
    *   `Code.exe` (PID 11960): 12.6

In [166]:
# Apply Fixes (OPTIONAL - Uncomment to use)
# ‚ö†Ô∏è WARNING: This will make real changes to your system
# Uncomment below to actually apply fixes:

# print("="*60)
# print("APPLYING AUTO-FIXES")
# print("="*60)
# 
# fix_result = await guard.run(apply_fixes=True)
# 
# print("\n‚úÖ FIXES APPLIED:")
# print(fix_result['fixes'])

## Safe Testing Mode

In [167]:
# CELL 11: Safe Testing Mode - Simulate Fixes
class SafeTestMode:
    """Test auto-fixes safely without making real changes"""
    
    def __init__(self, autofix_agent):
        self.autofix = autofix_agent
    
    async def test_fixes(self, issues_description: str):
        """Test what fixes would be applied without executing them"""
        
        print("="*80)
        print("üß™ SAFE TEST MODE - No Real Changes Will Be Made")
        print("="*80)
        
        test_prompt = f"""
You have these system issues:

{issues_description}

For EACH issue, explain:
1. What fix you would apply
2. Which tool you would use (restart_search_service, clear_temp_files, optimize_memory)
3. What the expected outcome is
4. Any risks or side effects

DO NOT actually call the tools - just explain what you WOULD do.
Be specific about the commands or actions.
"""
        
        runner = InMemoryRunner(agent=self.autofix)
        responses = await runner.run_debug(test_prompt)
        
        # Extract text
        result_text = ""
        for event in responses:
            if hasattr(event, 'content') and event.content:
                if hasattr(event.content, 'parts') and event.content.parts:
                    for part in event.content.parts:
                        if hasattr(part, 'text') and part.text:
                            result_text = part.text
        
        return result_text

# Create safe tester
safe_tester = SafeTestMode(autofix_agent)
print("‚úÖ Safe test mode ready")

‚úÖ Safe test mode ready


In [168]:
# # CELL 12: Add Interactive Temp File Functions (add to your tools section)
# def preview_temp_files() -> str:
#     """Preview what would be deleted WITHOUT deleting anything"""
#     try:
#         temp_dirs = [os.environ.get('TEMP'), os.environ.get('TMP')]
        
#         preview = {
#             "folders_to_clean": [],
#             "total_size_mb": 0,
#             "total_files": 0,
#             "sample_files": []
#         }
        
#         for temp_dir in temp_dirs:
#             if not temp_dir or not os.path.exists(temp_dir):
#                 continue
            
#             folder_size = 0
#             folder_files = 0
            
#             for filename in os.listdir(temp_dir)[:100]:
#                 try:
#                     file_path = os.path.join(temp_dir, filename)
#                     if os.path.isfile(file_path):
#                         size = os.path.getsize(file_path)
#                         folder_size += size
#                         folder_files += 1
                        
#                         if len(preview["sample_files"]) < 10:
#                             preview["sample_files"].append({
#                                 "name": filename[:60],
#                                 "size_kb": round(size / 1024, 2),
#                                 "age_days": round((datetime.now().timestamp() - os.path.getmtime(file_path)) / (24*60*60), 1)
#                             })
#                 except:
#                     continue
            
#             if folder_files > 0:
#                 preview["folders_to_clean"].append({
#                     "path": temp_dir,
#                     "files": folder_files,
#                     "size_mb": round(folder_size / (1024*1024), 2)
#                 })
#                 preview["total_size_mb"] += round(folder_size / (1024*1024), 2)
#                 preview["total_files"] += folder_files
        
#         return json.dumps(preview, indent=2)
#     except Exception as e:
#         return json.dumps({"error": str(e)})

# def clear_temp_files_interactive() -> str:
#     """Clear temp files with user confirmation"""
#     try:
#         preview = json.loads(preview_temp_files())
        
#         if preview.get("total_size_mb", 0) < 1:
#             return "‚úÖ No significant temp files to clean"
        
#         print("\n" + "="*80)
#         print("üóëÔ∏è  TEMP FILE CLEANUP PREVIEW")
#         print("="*80)
#         print(f"\nüìä Total: {preview['total_files']} files, {preview['total_size_mb']} MB")
        
#         print("\nüìÅ Locations:")
#         for folder in preview['folders_to_clean']:
#             print(f"  ‚Ä¢ {folder['path']}: {folder['files']} files, {folder['size_mb']} MB")
        
#         print("\nüìÑ Sample files:")
#         for i, file in enumerate(preview['sample_files'][:5], 1):
#             print(f"  {i}. {file['name']} ({file['size_kb']} KB, {file['age_days']} days old)")
        
#         print("\n" + "="*80)
#         print("‚ÑπÔ∏è  Your Documents, Downloads, Photos are NOT affected")
#         print("="*80)
        
#         response = input("\nü§î Delete these temp files? (yes/no): ").strip().lower()
        
#         if response not in ['yes', 'y']:
#             return "‚ùå Cleanup cancelled. No files deleted."
        
#         print("\nüîÑ Deleting...")
        
#         temp_dirs = [os.environ.get('TEMP'), os.environ.get('TMP')]
#         deleted = 0
#         freed = 0
        
#         for temp_dir in temp_dirs:
#             if not temp_dir or not os.path.exists(temp_dir):
#                 continue
            
#             for filename in os.listdir(temp_dir):
#                 try:
#                     file_path = os.path.join(temp_dir, filename)
#                     if os.path.isfile(file_path):
#                         size = os.path.getsize(file_path)
#                         os.unlink(file_path)
#                         deleted += 1
#                         freed += size
#                 except:
#                     continue
        
#         return f"‚úÖ Deleted {deleted} files, freed {freed/(1024*1024):.2f} MB"
        
#     except Exception as e:
#         return f"ERROR: {str(e)}"

# print("‚úÖ Interactive temp cleaner ready")

In [169]:
# CELL 13: Interactive OptimAIzer
class InteractiveOptimAIzer:
    """OptimAIzer with user confirmations"""
    
    def __init__(self, monitor, analyzer, memory):
        self.monitor = monitor
        self.analyzer = analyzer
        self.memory = memory
    
    def _extract_text(self, responses):
        text = ""
        for event in responses:
            if hasattr(event, 'content') and event.content:
                if hasattr(event.content, 'parts') and event.content.parts:
                    for part in event.content.parts:
                        if hasattr(part, 'text') and part.text:
                            text = part.text
        return text
    
    async def run_interactive(self):
        """Run with confirmations"""
        print("\nüîç Scanning system...")
        monitor_runner = InMemoryRunner(agent=self.monitor)
        monitor_resp = await monitor_runner.run_debug("Get complete system status")
        monitor_text = self._extract_text(monitor_resp)
        
        print("üß† Analyzing...")
        analyzer_runner = InMemoryRunner(agent=self.analyzer)
        analyzer_resp = await analyzer_runner.run_debug(f"Analyze: {monitor_text}")
        analysis_text = self._extract_text(analyzer_resp)
        
        # Check memory
        import re
        mem_match = re.search(r'Memory usage.*?(\d+\.?\d*)%', monitor_text, re.IGNORECASE)
        memory_pct = float(mem_match.group(1)) if mem_match else 0
        
        print("\n" + "="*80)
        print("üí° RECOMMENDED FIXES")
        print("="*80)
        
        fixes = []
        
        if memory_pct >= 84:
            print("\n1Ô∏è‚É£  OPTIMIZE MEMORY (Safe - no files deleted)")
            response = input("   Apply? (y/n): ").strip().lower()
            if response in ['yes', 'y']:
                fixes.append('optimize_memory')
        
        print("\n2Ô∏è‚É£  CLEAN TEMP FILES (Will show preview)")
        response = input("   Check temp files? (y/n): ").strip().lower()
        if response in ['yes', 'y']:
            fixes.append('clear_temp')
        
        if not fixes:
            print("\n‚úÖ No fixes selected")
            return {"monitor": monitor_text, "analysis": analysis_text, "fixes_applied": False}
        
        print("\n" + "="*80)
        print("üîß APPLYING FIXES")
        print("="*80)
        
        results = []
        
        for fix in fixes:
            if fix == 'optimize_memory':
                print("\n‚öôÔ∏è  Optimizing memory...")
                r = optimize_memory()
                print(f"   {r}")
                results.append(r)
            
            elif fix == 'clear_temp':
                r = clear_temp_files_interactive()
                results.append(r)
        
        print("\n‚úÖ DONE")
        
        return {
            "monitor": monitor_text,
            "analysis": analysis_text,
            "fixes_applied": True,
            "fix_results": results
        }

interactive_guard = InteractiveOptimAIzer(monitor_agent, analyzer_agent, memory)
print("‚úÖ Interactive OptimAIzer ready")

‚úÖ Interactive OptimAIzer ready


In [170]:
# CELL 14: Run It!
result = await interactive_guard.run_interactive()

# Show report
print("\n")
print(generate_user_report(result))

2025-11-29 20:40:33,619 - INFO - Sending out request, model: gemini-2.5-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False



üîç Scanning system...

 ### Created new session: debug_session_id

User > Get complete system status


2025-11-29 20:40:34,232 - INFO - Response received from the model.
2025-11-29 20:40:36,123 - INFO - Sending out request, model: gemini-2.5-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False
2025-11-29 20:40:38,941 - INFO - Response received from the model.
2025-11-29 20:40:38,951 - INFO - Sending out request, model: gemini-2.5-flash-lite, backend: GoogleLLMVariant.GEMINI_API, stream: False


monitor > The system status is as follows:

**CPU Usage:** The overall CPU usage is 7.2%, which is normal. However, some cores are experiencing higher usage, with one at 45.8% and another at 32.3%.

**Memory Usage:** The memory usage is at 90.7%, exceeding the recommended limit of 85%. This indicates a potential memory resource issue.

**Disk Usage:** The disk usage is at 58.9%, which is well within the normal range.

**Top Processes:**
*   CPU-intensive processes include `Code.exe` (22.4% and 21.8%), `python.exe` (12.1%), `dwm.exe` (8.1%), and `System` (7.5%).
*   Memory-intensive processes include `chrome.exe` (10.6% and 6.3%), `Code.exe` (4.7%), `Code.exe` (4.2%), and `Code.exe` (3.3%).

**GPU Usage:** The GPU utilization is at 5.0%, with 496.0 MB of memory used out of a total of 8188.0 MB.

**Network Usage:** The system has sent 493,272,719 bytes and received 8,718,165,066 bytes.

**Battery Status:** The battery is plugged in and has 93% charge remaining.

**Temperatures:** No temp

2025-11-29 20:40:41,427 - INFO - Response received from the model.


analyzer > **Analysis:**

**Memory:**
*   **Issue:** High Memory Usage
*   **Severity:** High
*   **Impact:** The system's memory usage is at 90.7%, exceeding the recommended limit of 85%. This can lead to decreased system performance, application slowdowns, and potential unresponsiveness as the system may resort to using slower disk-based swap space.

**CPU:**
*   **Issue:** Uneven CPU Core Usage, High Resource Processes
*   **Severity:** Medium
*   **Impact:** While overall CPU usage is low, individual cores are experiencing high utilization (45.8% and 32.3%). This indicates that specific processes may be bottlenecking performance on those cores. `Code.exe` (multiple instances) and `python.exe` are identified as major CPU consumers, contributing to the uneven load. This could lead to slower execution of tasks handled by these processes.

**Disk:**
*   **Severity:** Low
*   **Impact:** Disk usage is at 58.9%, which is normal and poses no immediate threat to system performance.

**Proc