# Telemetry - Compute

In this notebook we will learn how to read telemetry data about a Duckiebot's compute subsystem (CPU, RAM, etc.) using HTTP and then we will make a ComputeTelemetry component.

In [None]:
%load_ext autoreload
%autoreload 2

# TODO: change this to the name of your Duckiebot
VEHICLE_NAME: str = "db21j3"

In [None]:
# Telemetry - Compute - Read raw data from HTTP

import requests
from typing import List

categories: List[str] = ['cpu', 'gpu', 'memory', 'disk']

for category in categories:

    # define API url
    url: str = f"http://{VEHICLE_NAME}.local/health/{category}"

    # call API
    response = requests.get(url)
    data: dict = response.json()

    # print response out
    print(f"{category}:\t", data)

In [None]:
# Telemetry - Compute - Defining Components

import requests
from abc import ABC
from typing import Type, TypeVar, Dict
from duckietown.telemetry import GenericTelemetryAbs, CPUTelemetry, GPUTelemetry, MemoryTelemetry, DiskTelemetry
from duckietown.types import IQueue, Queue
from duckietown.components.base import Component
from duckietown.components.rendering import MarkdownRendererComponent
from duckietown.system import System


T = TypeVar("T", bound=GenericTelemetryAbs)


class GenericComputeTelemetryComponentAbs(Component[None, T], ABC):

    def __init__(self, TelemetryClass: Type[GenericTelemetryAbs], category: str, vehicle_name: str, frequency: float):
        super(GenericComputeTelemetryComponentAbs, self).__init__()
        assert frequency > 0
        self._category: str = category
        self._TelemetryClass: Type[GenericTelemetryAbs] = TelemetryClass
        # define url
        self._url: str = f"http://{vehicle_name}.local/health/{self._category}"
        self._frequency: float = frequency
        # queues
        self.out_data: IQueue = Queue()
    
    def worker(self):
        while not self.is_shutdown:
            # call API
            response = requests.get(self._url)
            data: dict = response.json()
            # instantiate Telemetry object
            telemetry: GenericTelemetryAbs = self._TelemetryClass.from_dict(data[self._category])
            # send data out
            self.out_data.put(telemetry)
            # wait
            time.sleep(1.0 / self._frequency)


class CPUTelemetryComponent(GenericComputeTelemetryComponentAbs[CPUTelemetry]):

    def __init__(self, vehicle_name: str, frequency: float):
        super(CPUTelemetryComponent, self).__init__(CPUTelemetry, "cpu", vehicle_name, frequency)


class GPUTelemetryComponent(GenericComputeTelemetryComponentAbs[GPUTelemetry]):

    def __init__(self, vehicle_name: str, frequency: float):
        super(GPUTelemetryComponent, self).__init__(GPUTelemetry, "gpu", vehicle_name, frequency)


class MemoryTelemetryComponent(GenericComputeTelemetryComponentAbs[MemoryTelemetry]):

    def __init__(self, vehicle_name: str, frequency: float):
        super(MemoryTelemetryComponent, self).__init__(MemoryTelemetry, "memory", vehicle_name, frequency)


class DiskTelemetryComponent(GenericComputeTelemetryComponentAbs[DiskTelemetry]):

    def __init__(self, vehicle_name: str, frequency: float):
        super(DiskTelemetryComponent, self).__init__(DiskTelemetry, "disk", vehicle_name, frequency)


In [None]:
# Telemetry - Compute - Read telemetry using a synchronizer and then render using Markdown

import time
import requests
from typing import Tuple
from duckietown.telemetry import GenericTelemetryAbs, CPUTelemetry, GPUTelemetry, MemoryTelemetry, DiskTelemetry
from duckietown.types import IQueue, Queue
from duckietown.components.base import Component
from duckietown.components.rendering import MarkdownRendererComponent
from duckietown.components.utilities import SynchronizerComponent
from duckietown.system import System

def pbar(percentage: float):
    length: int = 32
    used: int = int((percentage / 100) * length)
    unused: int = length - used 
    return "#" * used + "." * unused

def formatter(data: Tuple[CPUTelemetry, GPUTelemetry, MemoryTelemetry, DiskTelemetry]) -> str:
    cpu, gpu, memory, disk = data
    return """
### CPU
Frequency: ``{cpu_freq:.2f} GHz``\\
Used: ``0% [{cpu_perc_pbar}] 100%``

### GPU
Power: ``{gpu_power} Watt``\\
Used: ``0% [{gpu_perc_pbar}] 100%``

### Memory
Total: ``{mem_total:.2f} GB``\\
Used: ``0% [{mem_perc_pbar}] 100%``

### Disk
Total: ``{disk_total:.2f} GB``\\
Used: ``0% [{disk_perc_pbar}] 100%``
""".format(
    cpu_freq=cpu.frequency.current / (10 ** 9),
    cpu_perc_pbar=pbar(cpu.percentage),
    gpu_power=gpu.power,
    gpu_perc_pbar=pbar(gpu.percentage),
    mem_total=memory.total / (10 ** 9),
    mem_perc_pbar=pbar(memory.percentage),
    disk_total=disk.total / (10 ** 9),
    disk_perc_pbar=pbar(disk.percentage),
)

# create components
cpu: CPUTelemetryComponent = CPUTelemetryComponent(VEHICLE_NAME, 1)
gpu: GPUTelemetryComponent = GPUTelemetryComponent(VEHICLE_NAME, 1)
memory: MemoryTelemetryComponent = MemoryTelemetryComponent(VEHICLE_NAME, 1)
disk: DiskTelemetryComponent = DiskTelemetryComponent(VEHICLE_NAME, 1)
renderer: MarkdownRendererComponent = MarkdownRendererComponent(formatter)


synchronizer: SynchronizerComponent = SynchronizerComponent([cpu.out_data, gpu.out_data, memory.out_data, disk.out_data]) 


# connect components
renderer.in_data.wants(synchronizer.out_data)

# create system
system: System = System([cpu, gpu, memory, disk, renderer])

# run system (NOTE: this is blocking)
system.run()