### Inference of Energy Consumption to serverless functions
This script is dedicated to demonstrate the concept of distributing the energy consumption of a system to the serverless functions that are deployed to it.

To this end, the CPU Usage of the serverless function containers during time interval t is put into relation with the total CPU Usage of the system during time interval t and then multiplied with the measured energy consumption measured during time interval t.

In [1]:
# Imports
import requests
import logging
import json
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s >>> %(message)s',
                        handlers=[logging.StreamHandler()])
logger = logging.getLogger(__name__)

In [2]:
# Data collection settings
DATA_COLLECTION_INTERVAL_SECONDS = 60

# Prometheus settings
PROMETHEUS_BASE_URL = "http://localhost:30009"
REST_API_PATH = "/api/v1/query"

In [3]:
def prom_instant_query(query_str: str) -> dict: 
    """ Executes a Prometheus API Query using the given query. """
    query_url = f"{PROMETHEUS_BASE_URL}/{REST_API_PATH}"
    response = requests.get(query_url, params={'query': query_str})
    
    if response:
        return response.json()
    else:
        logger.info(f"API Request ({response.url}) was not successful: HTTP {response.status_code}")
        return dict()

def get_fn_container_cpu_usages(namespace: str = "openfaas-fn"):
    """ Returns the CPU usage of all serverless function containers in the given namespace. """
    query = "rate(container_cpu_usage_seconds_total{image!='', namespace='openfaas-fn', container_name!='POD'}[1m]) > 0"
    query_result = prom_instant_query(query_str=query)
    
    if query_result:
        logger.debug("FUNCTION CONTAINER CPU USAGES")
        logger.debug(json.dumps(query_result, indent=4))
        return query_result
    else:
        logger.error(f"Failed to retrieve the CPU Usages of containers in namespace {namespace}.")
        return dict()
    
def get_container_cpu_usage_sum(container_name: str, node: str):
    """ Returns the sum of the CPU usage of all containers with the given container_name on the given
        node. """
    query = "sum(rate(container_cpu_usage_seconds_total{image!='', container='%s', container_name!='POD', node='%s'}[1m]))" % (container_name, node)
    query_result = prom_instant_query(query_str=query)
    
    if query_result:
        logger.debug(f"SUM OF CPU USAGE OF CONTAINER {container_name.upper()} ON NODE {node.upper()}")
        logger.debug(json.dumps(query_result, indent=4))
        timestamp, sum_cpu_usage = tuple(query_result['data']['result'][0]['value'])
        return sum_cpu_usage
    else:
        logger.error(f"Failed to retrieve the container CPU usage sum for '{container_name}' on node '{node}'")
        return dict()
    
def get_node_cpu_usage_sum(node: str):
    """ Returns the sum of the CPU usage of all containers on the given node. """
    query = "sum(rate(container_cpu_usage_seconds_total{image!='', container_name!='POD', node='%s'}[1m]))" % node
    query_result = prom_instant_query(query_str=query)
    
    if query_result:
        logger.debug(f"NODE CPU USAGE SUM OF NODE {node.upper()}")
        logger.debug(json.dumps(query_result, indent=4))
        timestamp, sum_cpu_usage = tuple(query_result['data']['result'][0]['value'])
        return sum_cpu_usage
    else:
        logger.error(f"Failed to retrieve the node cpu usage sum of node '{node}'")
        return dict()
    
def get_node_energy_consumption(node: str):
    """ Returns the energy consumption of the given node. """
    query = "idelta(powerexporter_power_consumption_ampere_seconds_total{instance='%s'}[2m:1m])" % node
    query_result = prom_instant_query(query_str=query)
    
    if query_result:
        logger.debug(f"ENERGY CONSUMPTION OF NODE {node.upper()}")
        logger.debug(json.dumps(query_result, indent=4))
        timestamp, energy_consumption = tuple(query_result['data']['result'][0]['value'])
        return energy_consumption
    else:
        logger.error(f"Failed to retrieve the energy consumption of node '{node}'")
        return dict()
    
def collect_data():
    while True:
        container_map = dict()
        energy_map = dict()
        # Step 1: Get CPU usage of all containers in openfaas-fn
        # This gives us two things in 1 query:
        #   1) the various serverless functions that were deployed in the time frame
        #   2) the deployment node per container
        fn_container_cpu_usages = get_fn_container_cpu_usages()
        for fn_container in fn_container_cpu_usages['data']['result']:
            container_name = fn_container['metric']['container']
            
            node = fn_container['metric']['node']
            # Step 2: Get Node CPU Usage as sum of cpu usage of all containers
            sum_node_cpu_usage = get_node_cpu_usage_sum(node)
            
            # Step 3: Get energy consumption of node
            if node not in energy_map:
                energy_consumption = get_node_energy_consumption(node)
                energy_map.update({node: float(energy_consumption)})
                
            cpu_usages_per_node = container_map.setdefault(container_name, dict()) # node -> cpu usage of fn containers
            if node not in cpu_usages_per_node:
                sum_cpu_usage_container = get_container_cpu_usage_sum(container_name, node)
                cpu_usages_per_node.update({node: float(sum_cpu_usage_container) / float(sum_node_cpu_usage)})
            
            logger.info(energy_map)
            logger.info(container_map)
            
        logger.info("==============================================================")
        # Print results:
        for node in energy_map:
            logger.info(f"Energy Consumption of {node} = {energy_map.get(node)}")
                
        for function, cpu_per_node in container_map.items():
            for node, cpu_usage in cpu_per_node.items():
                logger.info(f"Energy Consumption ({function} | {node}) = {cpu_usage * energy_map.get(node)}")
            
        
        logger.info(f"Sleeping for {DATA_COLLECTION_INTERVAL_SECONDS} seconds ...")
        time.sleep(DATA_COLLECTION_INTERVAL_SECONDS)

In [None]:
# Collect data and distribute energy consumption until CTRL + C
collect_data()

2022-08-02 20:19:48,242 - __main__ - INFO >>> {'odroidxu4-1': 9.636629428923698}
2022-08-02 20:19:48,244 - __main__ - INFO >>> {'analyze-sentence': {'odroidxu4-1': 0.042695975429297324}}
2022-08-02 20:19:48,245 - __main__ - INFO >>> Energy Consumption of odroidxu4-1 = 9.636629428923698
2022-08-02 20:19:48,246 - __main__ - INFO >>> Energy Consumption (analyze-sentence | odroidxu4-1) = 0.4114452933185697
2022-08-02 20:19:48,444 - __main__ - INFO >>> {'odroidxu4-1': 9.636629428923698}
2022-08-02 20:19:48,445 - __main__ - INFO >>> {'analyze-sentence': {'odroidxu4-1': 0.042695975429297324}, 'nodeinfo': {'odroidxu4-1': 0.03684966696090755}}
2022-08-02 20:19:48,447 - __main__ - INFO >>> Energy Consumption of odroidxu4-1 = 9.636629428923698
2022-08-02 20:19:48,448 - __main__ - INFO >>> Energy Consumption (analyze-sentence | odroidxu4-1) = 0.4114452933185697
2022-08-02 20:19:48,448 - __main__ - INFO >>> Energy Consumption (nodeinfo | odroidxu4-1) = 0.355106585081519
2022-08-02 20:19:48,449 - __