In [1]:
import time
import asyncio
import pynvml
import subprocess
import pandas as pd
import numpy as np
from typing import Mapping
from functools import cached_property
from emt.power_groups.power_group import PowerGroup


In [2]:
class PowerIntegrator:
    """
    Integrates the instantaneous power usages (W) over the time-delta between the previous call.
    This performs a definite integral of the instantaneous power, using a high-resolution timer,
    the timer measures the time passed since the last call and integrates the power using the
    trapezoidal rule.
    """

    def __init__(self):
        self._previous_time = time.perf_counter()
        self._previous_power = 0.0
        self._energy = 0

    def __call__(self, current_power):
        """
        Add an instantaneous power value (in watts) for a power zone and calculate the cumulative energy
        consumption in Joules.
        Args:
            power_watt (float): Instantaneous power usage in watts.
        Returns:
            float: Cumulative energy consumption in watt-seconds.
        """
        energy_delta = 0
        current_time = time.perf_counter()
        time_delta = current_time - self._previous_time
        # Calculate the energy consumed during this time interval using the trapezoidal rule
        energy_delta = ((current_power + self._previous_power) / 2.0) * time_delta
        self._energy += energy_delta

        # Update the last time for the next call
        self._previous_time = current_time
        return self._energy


In [3]:
pynvml.nvmlInit()
zones = []
power_integrators = []
for index in range(pynvml.nvmlDeviceGetCount()):
    zone_handle = pynvml.nvmlDeviceGetHandleByIndex(index)
    zones.append(zone_handle)
    power_integrators.append(PowerIntegrator())

In [4]:
pynvml.nvmlDeviceGetCount(), zones, power_integrators

(1,
 [<pynvml.nvml.LP_struct_c_nvmlDevice_t at 0x1bf07639bc0>],
 [<__main__.PowerIntegrator at 0x1bf076653d0>])

In [5]:
names = [ pynvml.nvmlDeviceGetIndex(zone) for zone in zones]
names

[0]

In [8]:
zones

[<pynvml.nvml.LP_struct_c_nvmlDevice_t at 0x1bf07639bc0>]

In [9]:
pynvml.nvmlDeviceGetPowerUsage(zones[0])

NVMLError_NoData: No data

In [7]:
command = "nvidia-smi  pmon -c 1"
output = subprocess.check_output(command, shell=True, text=True)
output

CalledProcessError: Command 'nvidia-smi  pmon -c 1' returned non-zero exit status 255.

In [16]:
! nvidia-smi --query-gpu=utilization.gpu,utilization.memory --format=csv


utilization.gpu [%], utilization.memory [%]
0 %, 0 %


In [None]:
lines = output.rstrip().split("\n")
header = lines[0][1:].split()  # Extract field names from the header
# the second line is units, data begins at the third line
data = [line.split() for line in lines[2:] if line.strip()]
df = pd.DataFrame(data, columns=header)[["gpu", "pid", "sm", "mem"]]
df = df.apply(pd.to_numeric, errors="coerce")
# filter out pids that are not relevant
filter = df['pid'].apply(_filter)
df_system = df.drop(columns=['pid'])
df_processes  = df_system[filter]
df_processes= df_processes.groupby('gpu').sum().fillna(0.0)
df_processes.to_dict(orient='index')

In [19]:
# Get the number of GPUs
device_count = pynvml.nvmlDeviceGetCount()
device_count

1

In [18]:
import pynvml

# Initialize NVML
pynvml.nvmlInit()

# Get the number of GPUs
device_count = pynvml.nvmlDeviceGetCount()

# Loop through each GPU
for device_index in range(device_count):
    handle = pynvml.nvmlDeviceGetHandleByIndex(device_index)
    print(f"GPU {device_index}: {pynvml.nvmlDeviceGetName(handle)}")
    # Get running processes on the GPU
    processes = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
    
    for process in processes:
        pid = process.pid
        gpu_utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
        memory_used = pynvml.nvmlDeviceGetMemoryInfo(handle).used
        
        print(f"PID: {pid}, GPU Utilization: {gpu_utilization.gpu}%, Memory Used: {memory_used / (1024 ** 2)} MB")

# Shutdown NVML
pynvml.nvmlShutdown()


GPU 0: NVIDIA RTX A1000 6GB Laptop GPU
PID: 2956, GPU Utilization: 6%, Memory Used: 455.02734375 MB
PID: 6964, GPU Utilization: 6%, Memory Used: 455.02734375 MB
PID: 15232, GPU Utilization: 6%, Memory Used: 455.02734375 MB
PID: 10200, GPU Utilization: 6%, Memory Used: 455.02734375 MB
PID: 14280, GPU Utilization: 6%, Memory Used: 455.02734375 MB


In [None]:
def _read_utilized_energy(self):
    """
    """    
    energy_zones = {zone: 0.0 for zone in self.zones}
    for zone, zone_handle, integrator in zip(
        self.zones, self._zones, self._power_integrators
    ):
        try:
            # Retrieves power usage in mW, divide by 1000 to get in W.
            # Measure total energy consumption at this point in time
            current_total_energy = pynvml.nvmlDeviceGetTotalEnergyConsumption(zone_handle)
            # get the zone level energy used
            zone_energy_used = integrator(current_total_energy)
            zone_memory_used = pynvml.nvmlDeviceGetMemoryInfo(zone_handle).total
            print(f"Zone: {zone_handle}, energy_used: {zone_energy_used}, memory_used: {zone_memory_used}")
            # Get running processes on the GPU
            processes = pynvml.nvmlDeviceGetComputeRunningProcesses(zone_handle)
            # Filter processes based on self.pids
            filtered_processes = [
                process for process in processes
                if process.pid in self.pids
            ]
            for process in filtered_processes:
                pid = process.pid
                memory_used = process.usedGpuMemory  # Memory used by this specific process   
                # Here you might estimate energy usage based on memory usage or other metrics
                # This is a simplistic approach and might not be accurate
                estimated_energy_usage = (memory_used / zone_memory_used) * zone_energy_used
                print(f"PID: {pid}, Memory Used: {memory_used / (1024 ** 2)} MB, Estimated Energy Used: {estimated_energy_usage:.2f} J")
        except pynvml.NVMLError:
            raise Exception
        # get time elapsed since
    return energy_zones

In [None]:
def _read_energy(self):
        """
        Retrieves instantaneous power usages (W) of all GPUs in use by the tracked processes.
        Integrates the power using the corresponding power integrator for the zone, reports
        the cumulative energy fro each zone.
        """
        energy_zones = {zone: 0.0 for zone in self.zones}
        for zone, zone_handle, integrator in zip(
            self.zones, self._zones, self._power_integrators
        ):
            try:
                # Retrieves power usage in mW, divide by 1000 to get in W.
                power_usage = pynvml.nvmlDeviceGetPowerUsage(zone_handle) / 1000
                energy_zones[zone] = integrator(power_usage)
            except pynvml.NVMLError:
                raise Exception
            # get time elapsed since
        return energy_zones
def _read_utilization(self) -> Mapping[int, float]:
        """
        This method provides utilization (per-zone) of the compute devices by the tracked
        processes.The is used to attribute a proportionate energy credit to the processes.

        """
        def _filter(pid):
            """
            The filter masks out the `pid` entries not tracked 
            by the energy monitor and returns the boolean mask.
            """
            keep = False
            if not np.isnan(pid):
                keep = True if int(pid) in self.pids else False
            return keep
        
        command = "nvidia-smi  pmon -c 1"
        output = subprocess.check_output(command, shell=True, text=True)
        lines = output.rstrip().split("\n")
        header = lines[0][1:].split()  # Extract field names from the header
        # the second line is units, data begins at the third line
        data = [line.split() for line in lines[2:] if line.strip()]
        df = pd.DataFrame(data, columns=header)[["gpu", "pid", "sm", "mem"]]
        df = df.apply(pd.to_numeric, errors="coerce")
        # filter out pids that are not relevant
        filter = df['pid'].apply(_filter)
        df_system = df.drop(columns=['pid'])
        df_processes  = df_system[filter]
        df_processes= df_processes.groupby('gpu').sum().fillna(0.0)
        return df_processes.to_dict(orient='index')
async def commence(self) -> None:
        """
        This commence a periodic execution at a set rate:
            [get_energy_trace -> update_energy_consumption -> async_wait]

        The periodic execution is scheduled at the rate dictated by `self.sleep_interval`, during the
        instantiation. The energy consumption is updated using the `_read_energy` and `_read_utilization`
        methods. The method credits energy consumption to the tracked processes by weighting the energy
        trace, obtained from each zone, by the utilization of the zone by the processes.
        """
        while True:
            utilization_trace = self._read_utilization()
            energy_trace = self._read_energy()

            self._count_trace_calls += 1
            self.logger.debug(
                f"Obtained energy trace no.{self._count_trace_calls} from {type(self).__name__ }:\n"
                f"utilization: {utilization_trace}\n"
                f"energy:     {energy_trace}"
            )

            for zone in utilization_trace:    
                #fmt: off
                self._consumed_energy += (
                    energy_trace[zone] * utilization_trace[zone]['sm']
                )
                # fmt: on

            await asyncio.sleep(self.sleep_interval)