In [1]:
import psutil
import time
try:
    import pynvml
    pynvml.nvmlInit()
    NVML_AVAILABLE = True
except:
    NVML_AVAILABLE = False
from nvitop import Device, CudaDevice, MigDevice,NA

class SystemMetrics:
    def __init__(self):
        self.prev_read_bytes = 0
        self.prev_write_bytes = 0
        self.prev_net_bytes_recv = 0
        self.prev_net_bytes_sent = 0
        self.prev_time = time.time()
        self._initialize_counters()
        self.devices = Device.all()

    def _initialize_counters(self):
        io_counters = psutil.net_io_counters()
        self.prev_net_bytes_recv = io_counters.bytes_recv
        self.prev_net_bytes_sent = io_counters.bytes_sent
        disk_io = psutil.disk_io_counters()
        self.prev_read_bytes = disk_io.read_bytes
        self.prev_write_bytes = disk_io.write_bytes

    def get_cpu_metrics(self):
        return {
            'cpu_percentages': psutil.cpu_percent(percpu=True),
            'cpu_freqs': psutil.cpu_freq(percpu=True),
            'mem_percent': psutil.virtual_memory().percent
        }

    def get_disk_metrics(self):
        current_time = time.time()
        io_counters = psutil.disk_io_counters()
        disk_usage = psutil.disk_usage('/')
        
        time_delta = max(current_time - self.prev_time, 1e-6)
        
        read_speed = (io_counters.read_bytes - self.prev_read_bytes) / (1024**2) / time_delta
        write_speed = (io_counters.write_bytes - self.prev_write_bytes) / (1024**2) / time_delta
        
        self.prev_read_bytes = io_counters.read_bytes
        self.prev_write_bytes = io_counters.write_bytes
        self.prev_time = current_time
        
        return {
            'read_speed': read_speed,
            'write_speed': write_speed,
            'disk_used': disk_usage.used,
            'disk_total': disk_usage.total
        }

    def get_network_metrics(self):
        current_time = time.time()
        net_io_counters = psutil.net_io_counters()
        
        time_delta = max(current_time - self.prev_time, 1e-6)
        
        download_speed = (net_io_counters.bytes_recv - self.prev_net_bytes_recv) / (1024 ** 2) / time_delta
        upload_speed = (net_io_counters.bytes_sent - self.prev_net_bytes_sent) / (1024 ** 2) / time_delta
        
        self.prev_net_bytes_recv = net_io_counters.bytes_recv
        self.prev_net_bytes_sent = net_io_counters.bytes_sent
        self.prev_time = current_time
        
        return {
            'download_speed': download_speed,
            'upload_speed': upload_speed
        }

    def get_gpu_metrics(self):
        gpu_metrics = []
        for device in self.devices:
            with device.oneshot():
                gpu_metrics.append({
                    'gpu_util': device.gpu_utilization() if device.gpu_utilization() is not NA else -1,
                    'mem_used': device.memory_used() / (1024**3) if device.memory_used() is not NA else -1,
                    'mem_total': device.memory_total() / (1024**3) if device.memory_total() is not NA else -1,
                    'temperature': device.temperature() if device.temperature() is not NA else -1,
                    'fan_speed': device.fan_speed() if device.fan_speed() is not NA else -1,
                })
            
        return gpu_metrics[0]


Your installed package `nvidia-ml-py` is corrupted. Skip patch functions `nvmlDeviceGet{Compute,Graphics,MPSCompute}RunningProcesses`. You may get incorrect or incomplete results. Please consider reinstall package `nvidia-ml-py` via `pip3 install --force-reinstall nvidia-ml-py nvitop`.
Your installed package `nvidia-ml-py` is corrupted. Skip patch functions `nvmlDeviceGetMemoryInfo`. You may get incorrect or incomplete results. Please consider reinstall package `nvidia-ml-py` via `pip3 install --force-reinstall nvidia-ml-py nvitop`.
Your installed package `nvidia-ml-py` is corrupted. Skip patch functions `nvmlDeviceGetMemoryInfo`. You may get incorrect or incomplete results. Please consider reinstall package `nvidia-ml-py` via `pip3 install --force-reinstall nvidia-ml-py nvitop`.


In [25]:
from textual.app import ComposeResult
from textual.widgets import Static
from .base import MetricWidget
import plotext as plt
from ..utils.formatting import ansi2rich, align
import psutil
import platform

class CPUWidget(MetricWidget):
    """CPU usage display widget with per-physical processor metrics."""
    DEFAULT_CSS = """
    CPUWidget {
        height: 100%;
        border: solid green;
        background: $surface;
        layout: vertical;
    }
    
    .metric-title {
        text-align: left;
    }
    
    .cpu-metric-value {
        height: 1fr;
    }
    """
    def __init__(self, title: str, id: str = None):
        super().__init__(title=title, id=id)
        self.title = title
        self.border_title = title

    def compose(self) -> ComposeResult:
        yield Static("", id="cpu-content", classes="cpu-metric-value")

    def get_physical_cpu_mapping(self):
        mapping = {}
        if platform.system() == "Linux":
            try:
                with open("/proc/cpuinfo") as f:
                    cpu_info = {}
                    for line in f:
                        if line.strip() == "":
                            if "processor" in cpu_info and "physical id" in cpu_info:
                                proc_id = int(cpu_info["processor"])
                                phys_id = int(cpu_info["physical id"])
                                mapping.setdefault(phys_id, []).append(proc_id)
                            cpu_info = {}
                        else:
                            key, _, value = line.partition(":")
                            cpu_info[key.strip()] = value.strip()
            except Exception:
                # Fallback: assume one physical CPU with all logical cores
                mapping = {0: list(range(psutil.cpu_count(logical=True)))}
        else:
            # For non-Linux systems, group all cores under physical id 0
            mapping = {0: list(range(psutil.cpu_count(logical=True)))}
        return mapping

    def get_physical_cpu_usage(self, interval=1):
        logical_usage = psutil.cpu_percent(interval=interval, percpu=True)
        mapping = self.get_physical_cpu_mapping()
        physical_usage = {}
        for phys_id, logical_indices in mapping.items():
            usage = sum(logical_usage[i] for i in logical_indices) / len(logical_indices)
            physical_usage[phys_id] = usage
        return physical_usage

    def create_disk_usage_bar(self, disk_used: float, disk_total: float, total_width: int = 40) -> str:
        if disk_total == 0:
            return "No disk usage data..."
        
        usage_percent = (disk_used / disk_total) * 100
        available = disk_total - disk_used

        usable_width = total_width - 2
        used_blocks = int((usable_width * usage_percent) / 100)
        free_blocks = usable_width - used_blocks

        usage_bar = f"[magenta]{'█' * used_blocks}[/][cyan]{'█' * free_blocks}[/]"

        used_gb = disk_used / (1024 ** 3)
        available_gb = available / (1024 ** 3)
        used_gb_txt = align(f"{used_gb:.1f} GB USED", total_width // 2 - 2, "left")
        free_gb_txt = align(f"FREE: {available_gb:.1f} GB ", total_width // 2 - 2, "right")
        return f"[magenta]{used_gb_txt}[/]DISK[cyan]{free_gb_txt}[/]\n {usage_bar}"

    def create_bar_chart(self, cpu_percentages, mem_percent, labels, width, height):
        # Create CPU usage bar chart
        plt.clear_figure()
        plt.theme("pro")
        plt.plot_size(width=width + 2, height=len(cpu_percentages) + 2)
        plt.xfrequency(0)
        plt.xlim(5, 100)
        plt.bar(labels, cpu_percentages, orientation="h")
        cpubars = ansi2rich(plt.build()).replace("\x1b[0m", "").replace("\x1b[1m", "")
        
        # Create RAM usage bar chart
        plt.clear_figure()
        plt.theme("pro")
        plt.plot_size(width=width + 2, height=4)
        plt.xticks([1, 25, 50, 75, 100], ["0", "25", "50", "75", "100"])
        plt.xlim(5, 100)
        plt.bar(["RAM"], [mem_percent], orientation="h")
        rambars = ansi2rich(plt.build()).replace("\x1b[0m", "").replace("\x1b[1m", "")
        
        return cpubars + rambars

    def update_content(self, mem_percent, disk_used, disk_total, interval=1):
        # Get per-physical CPU usage metrics
        physical_usage = self.get_physical_cpu_usage(interval=interval)
        sorted_phys_ids = sorted(physical_usage.keys())
        cpu_percentages = [physical_usage[pid] for pid in sorted_phys_ids]
        labels = [f"P{pid}" for pid in sorted_phys_ids]
        
        # Determine available plot space
        width = self.size.width - 4
        height = self.size.height - 2
        
        # Build charts
        cpuram_chart = self.create_bar_chart(cpu_percentages, mem_percent, labels, width, height)
        disk_chart = self.create_disk_usage_bar(disk_used, disk_total, width + 2)
        self.query_one("#cpu-content").update(cpuram_chart + "\n" + disk_chart)


ImportError: attempted relative import with no known parent package

In [67]:
from ground_control.widgets import gpu
import importlib
from rich import print
importlib.reload(gpu)
gpuw = gpu.GPUWidget("GPU")
# gpuw.compose()
gpuw.plot_height = 10
gpuw.plot_width = 40

gpuw.update_content(50,78)
gpuw.gpu_ram_history.append(50)
gpuw.gpu_usage_history.append(78)
gpuw.gpu_ram_history.append(50)
gpuw.gpu_usage_history.append(78)
gpuw.gpu_ram_history.append(50)
gpuw.gpu_usage_history.append(78)
gpuw.gpu_ram_history.append(50)
gpuw.gpu_usage_history.append(78)
# gpuw.update_content(50,78)
# gpuw.update_content(50,78)
# gpuw.update_content(50,78)
print(gpuw.create_center_bar(20,50,40))
print(gpuw.get_dual_plot()   ) 

In [None]:
import plotext as plt
import re

def ansi2rich(text: str) -> str:
    """Replace ANSI color sequences with Rich markup."""
    # Define a mapping of ANSI codes to Rich markup colors or styles
    color_map = {
        '12': 'blue',
        '10': 'green',
        '7': 'bold',
        '2': "magenta"
        # Add more mappings as needed
    }
    
    # Regular expression to match ANSI escape sequences (foreground 38;5;<code>)
    ansi_pattern = re.compile(r'\x1b\[38;5;(\d+)m(.*?)\x1b\[0m')
    
    def replace_ansi_with_rich(match):
        ansi_code = match.group(1)
        text_content = match.group(2)
        rich_color = color_map.get(ansi_code, None)
        if rich_color:
            return f"[{rich_color}]{text_content}[/]"
        else:
            # If the ANSI code is not in the map, return the text without formatting
            return text_content
    
    # Apply the replacement
    text = ansi_pattern.sub(replace_ansi_with_rich, text)
    
    # Clean up any remaining unsupported or stray ANSI sequences
    # Matches all ANSI escape sequences
    text = re.sub(r'\x1b\[[0-9;]*m', '', text)
    
    return text
plt.clear_figure()
plt.theme("pro")
plt.plot_size(width=20+1, height=4)
plt.xticks([1, 25, 50, 75, 100], ["0", "25", "50", "75", "100"])
plt.xlim(5, 100)
plt.bar(["RAM"], [35], orientation="h", color="green")
str(ansi2rich(plt.build()))

'   ┌────────────────┐\nRAM┤██████          │\n   └───┬───┬───┬───┬┘\n   0  25  50  75 100 \n'

In [65]:
positive_series

[50.1, 50.1, 50.1, 50.1]

In [None]:
plt.clear_data()
plt.clear_data()
plt.plot_size(height=gpuw.plot_height, width=gpuw.plot_width-1)
plt.theme("pro")

plt.plot(positive_series, marker="braille", label="GPU RAM")
plt.show()

In [5]:
import nvitop
for g in get_all_gpu_devices():
    print(g.name(),g.index)
    # print(g.utilization_rates())
    # print(g.memory_used()/g.memory_total())
    # print(g.is_available())
    # print()
    # print(g.temperature())
    # print(g.fan_speed())
    # print(g.processes())
    # print(g.utilization())
    # print(g.memory_info())
    # print(g.ecc_errors())

NVIDIA A100-PCIE-40GB MIG 3g.20gb (0, 0)
NVIDIA A100-PCIE-40GB MIG 3g.20gb (0, 1)
NVIDIA A100-PCIE-40GB MIG 3g.20gb (1, 0)
NVIDIA A100-PCIE-40GB MIG 3g.20gb (1, 1)
NVIDIA A100-PCIE-40GB 2
NVIDIA A100-PCIE-40GB MIG 3g.20gb (3, 0)
NVIDIA A100-PCIE-40GB MIG 3g.20gb (3, 1)
NVIDIA A100-PCIE-40GB 4
