# plugin

> Plugin implementation for NVIDIA GPU monitoring using nvitop

In [None]:
#| default_exp plugin

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import logging
import psutil
import subprocess
from typing import Any, Dict, Optional

from cjm_infra_plugin_system.plugin_interface import MonitorPlugin
from cjm_infra_plugin_system.core import SystemStats
from cjm_plugin_system.utils.validation import dataclass_to_jsonschema

## NvidiaMonitorPlugin

NVIDIA GPU monitoring plugin using `nvitop` library. Provides real-time hardware telemetry for:

- **GPU Memory**: Free, used, and total VRAM across all visible devices
- **GPU Utilization**: Compute load percentage
- **System RAM**: Via psutil for cross-platform support
- **CPU**: Overall utilization percentage

Falls back to `nvidia-smi` if `nvitop` is not available.

In [None]:
#| export
class NvidiaMonitorPlugin(MonitorPlugin):
    """NVIDIA System Monitor using nvitop."""
    
    def __init__(self):
        """Initialize the NVIDIA monitor plugin."""
        self.logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
        self.config = {}

    @property
    def name(self) -> str:  # Plugin identifier
        """Plugin name."""
        return "sys-mon-nvidia"
    
    @property
    def version(self) -> str:  # Plugin version
        """Plugin version."""
        return "1.0.0"

    def initialize(
        self,
        config: Optional[Dict[str, Any]] = None  # Configuration dictionary
    ) -> None:
        """Initialize or reconfigure the plugin."""
        self.config = config or {}
        self.logger.info("NvidiaMonitor initialized")

    def get_config_schema(self) -> Dict[str, Any]:  # JSON Schema
        """Return JSON Schema for configuration."""
        return {}  # No config needed for monitoring

    def get_current_config(self) -> Dict[str, Any]:  # Current config
        """Return current configuration."""
        return self.config

    def cleanup(self) -> None:
        """Clean up resources."""
        pass

    def _get_gpu_info_internal(self) -> Dict[str, Any]:  # Raw GPU data
        """Check for GPU availability and get info using nvitop."""
        gpu_info = {'available': False, 'type': 'None', 'details': {}, 'processes': []}
        
        try:
            import nvitop
            from nvitop import Device, GpuProcess, NA

            devices = Device.all()
            if devices:
                gpu_info['available'] = True
                gpu_info['type'] = 'NVIDIA'

                for i, device in enumerate(devices):
                    # Memory
                    mem_total = device.memory_total()
                    mem_used = device.memory_used()
                    mem_free = device.memory_free()
                    
                    # Convert NA to 0
                    if mem_total == NA: mem_total = 0
                    if mem_used == NA: mem_used = 0
                    if mem_free == NA: mem_free = 0

                    gpu_info['details'][f'gpu_{i}'] = {
                        'name': device.name(),
                        'memory_total': mem_total // (1024**2),
                        'memory_used': mem_used // (1024**2),
                        'memory_free': mem_free // (1024**2),
                        'utilization': device.gpu_utilization() if device.gpu_utilization() != NA else 0
                    }
        except ImportError:
            # nvitop not available, fallback to nvidia-smi
            try:
                result = subprocess.run(
                    ['nvidia-smi', '--query-gpu=name,memory.total,memory.used,memory.free,utilization.gpu',
                     '--format=csv,noheader,nounits'],
                    capture_output=True, text=True, timeout=2
                )
                if result.returncode == 0:
                    lines = result.stdout.strip().split('\n')
                    for i, line in enumerate(lines):
                        parts = [p.strip() for p in line.split(',')]
                        if len(parts) >= 5:
                            gpu_info['available'] = True
                            gpu_info['type'] = 'NVIDIA'
                            gpu_info['details'][f'gpu_{i}'] = {
                                'name': parts[0],
                                'memory_total': int(float(parts[1])) if parts[1] and parts[1] != 'N/A' else 0,
                                'memory_used': int(float(parts[2])) if parts[2] and parts[2] != 'N/A' else 0,
                                'memory_free': int(float(parts[3])) if parts[3] and parts[3] != 'N/A' else 0,
                                'utilization': int(float(parts[4])) if parts[4] and parts[4] != 'N/A' else 0,
                            }
            except (subprocess.TimeoutExpired, FileNotFoundError, Exception):
                pass
        except Exception as e:
            self.logger.warning(f"Error checking GPU: {e}")

        return gpu_info

    def execute(
        self,
        command: str = "get_system_status",  # Command to execute
        **kwargs
    ) -> Dict[str, Any]:  # SystemStats as dictionary
        """Collect stats and return standardized SystemStats dictionary."""
        if command != "get_system_status":
            raise ValueError(f"Unknown command: {command}")
        
        # 1. Get Host CPU/RAM (psutil)
        vm = psutil.virtual_memory()
        
        # 2. Get GPU Data
        gpu_raw = self._get_gpu_info_internal()
        
        # 3. Aggregate GPU Stats for the Scheduler
        total_vram_free = 0
        total_vram_total = 0
        total_vram_used = 0
        max_load = 0
        
        if gpu_raw['available']:
            for key, det in gpu_raw['details'].items():
                total_vram_free += det.get('memory_free', 0)
                total_vram_total += det.get('memory_total', 0)
                total_vram_used += det.get('memory_used', 0)
                max_load = max(max_load, det.get('utilization', 0))

        # 4. Return Standardized Object as dict
        stats = SystemStats(
            cpu_percent=psutil.cpu_percent(),
            memory_used_mb=vm.used / (1024**2),
            memory_total_mb=vm.total / (1024**2),
            memory_available_mb=vm.available / (1024**2),
            gpu_type=gpu_raw['type'],
            gpu_free_memory_mb=float(total_vram_free),
            gpu_total_memory_mb=float(total_vram_total),
            gpu_used_memory_mb=float(total_vram_used),
            gpu_load_percent=float(max_load),
            details=gpu_raw
        )
        return stats.to_dict()

### Usage Example

```python
# Direct usage (for testing)
plugin = NvidiaMonitorPlugin()
plugin.initialize()
stats = plugin.execute()
print(f"GPU Free: {stats['gpu_free_memory_mb']}MB")
print(f"RAM Available: {stats['memory_available_mb']}MB")
```

### Integration with PluginManager

```python
from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.scheduling import SafetyScheduler

manager = PluginManager(scheduler=SafetyScheduler())
manager.load_all()

# Register this plugin as the system monitor
manager.register_system_monitor("cjm-system-monitor-nvidia")

# Now scheduling checks will use real GPU stats
result = manager.execute_plugin("whisper-local", audio="/path/to/audio.wav")
```

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()