# Getting the energy profile of Marconi100 jobs

In this notebook read the marconi data and i filter the jobs that have a processing time greater than 60 seconds and that have an energy profile (taking into account the p0_power metric)

## 1. Read Data

In [16]:
import pandas as pd

df_jobs = pd.read_parquet("../Marconi-test/plugin=job_table/metric=job_info_marconi100/a_0.parquet")

## Other power-related metrics available
#"../Marconi-test/plugin=ipmi_pub/metric=p1_power/a_0.parquet"
#"../Marconi-test/plugin=ipmi_pub/metric=p0_io_power/a_0.parquet"
#"../Marconi-test/plugin=ipmi_pub/metric=p1_io_power/a_0.parquet"
#"../Marconi-test/plugin=ipmi_pub/metric=p0_mem_power/a_0.parquet"
#"../Marconi-test/plugin=ipmi_pub/metric=p1_mem_power/a_0.parquet"
#"../Marconi-test/plugin=ipmi_pub/metric=total_power/a_0.parquet"
df_power_p0_power = pd.read_parquet("../Marconi-test/plugin=ipmi_pub/metric=total_power/a_0.parquet")
#df_jobs.columns

In [17]:
df_power_p0_power['node'] = pd.to_numeric(df_power_p0_power['node'])
df_power_p0_power['value'] = pd.to_numeric(df_power_p0_power['value'])
print(df_power_p0_power.columns)
print(df_jobs.columns)

Index(['timestamp', 'value', 'node'], dtype='object')
Index(['accrue_time', 'alloc_node', 'alloc_sid', 'array_job_id',
       'array_max_tasks', 'array_task_id', 'array_task_str',
       'array_task_throttle', 'assoc_id', 'batch_flag', 'batch_host',
       'billable_tres', 'bitflags', 'boards_per_node', 'contiguous',
       'cores_per_socket', 'cpus_alloc_layout', 'cpus_allocated',
       'cpus_per_task', 'cpus_per_tres', 'dependency', 'derived_ec',
       'eligible_time', 'end_time', 'exc_nodes', 'exit_code', 'features',
       'group_id', 'job_id', 'job_state', 'last_sched_eval', 'max_cpus',
       'max_nodes', 'mem_per_cpu', 'mem_per_node', 'min_memory_cpu',
       'min_memory_node', 'nice', 'nodes', 'ntasks_per_board',
       'ntasks_per_core', 'ntasks_per_core_str', 'ntasks_per_node',
       'ntasks_per_socket', 'ntasks_per_socket_str', 'num_cpus', 'num_nodes',
       'num_tasks', 'partition', 'pn_min_cpus', 'pn_min_memory',
       'pn_min_tmp_disk', 'power_flags', 'priority', 'pr

## 2. Converting timestamps to seconds

In [18]:
import numpy as np

#df_jobs['run_time_seconds']

df_jobs["run_time"] = (df_jobs['end_time']-df_jobs['start_time']) / np.timedelta64(1, 's')
df_jobs["run_time"]

0             3.0
1         78983.0
2             4.0
3            12.0
4             3.0
           ...   
239937    12918.0
239938    78453.0
239939     9166.0
239940       13.0
239941    78803.0
Name: run_time, Length: 239942, dtype: float64

## Filter 1: removing jobs that take less than a minute

In [4]:
SECONDS_ONE_MINUTE = 60.0

df_jobs_f1 = df_jobs.loc[df_jobs["run_time"] >= SECONDS_ONE_MINUTE]
perc_jobs_filtered = (len(df_jobs_f1)/len(df_jobs))*100
print(str(perc_jobs_filtered), "% of the original job table")

60.0953563777913 % of the original job table


## Filter 2: remove jobs with no energy profile

Fast method for single-node jobs

In [61]:
import ast

#df_jobs_f12 = pd.DataFrame(lst_jobs_f12)

#print(df_jobs_f12)

## do faster for single node jobs
df_jobs_f1_single = df_jobs_f1[df_jobs_f1["num_nodes"]==1]

#removing malformed "nodes"
df_jobs_f1_single = df_jobs_f1_single[df_jobs_f1_single["nodes"].str.match("\[\d+\]")].copy().reset_index(drop=True)
df_jobs_f1_single["node"] = [ast.literal_eval(x)[0] for x in df_jobs_f1_single['nodes']]


#use this for smaller inputs
#sample = df_jobs_f1_single[0:10000].copy().reset_index(drop=True)

sample = df_jobs_f1_single

group = df_power_p0_power.groupby(by="node")
#group.get_group(519)["timestamp"]

#result = sample.apply(lambda x: any(group.get_group(x["node"])["timestamp"].between(x["start_time"], x["end_time"])), axis=1)
#print(result.values)
#print(sample[["node", "start_time", "end_time"]].values.tolist())


#for debugging
#result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]

#i know that 0 is node, 1 is start_time, and 2 is end_time
result = [any(group.get_group(x[0])["timestamp"].between(x[1], x[2])) for x in sample[["node", "start_time", "end_time"]].values.tolist()]
#print(result)
sample["has_profile"] = result
df_jobs_f12_single = sample[sample["has_profile"]==True]
#[group.get_group(x["node"]["timestamp"].between(x["start_time"], x["end_time"])) for x in sample.values.tolist()]

#result = sample.any(df_power_p0_power[df_power_p0_power['node'] == node]
#print(df_jobs_f1_single["node"])

#df_jobs_f1_single_2 = df_jobs_f1_single.apply(lambda x : x[re.search("\[\d+\]", x["nodes"]) != None], axis=1)
    
    

#df_jobs_f1_single["node"] = ast.literal_eval(df_jobs_f1_single["nodes"])[0]
#print(df_jobs_f1_single.all(re.search("\[\d+\]", df_jobs_f1_single["nodes"]) == None))


perc_jobs_filtered = (len(df_jobs_f12_single)/len(df_jobs))*100
print(str(perc_jobs_filtered), "% of the original job table")

52.24846004451076 % of the original job table


Slower method for multi-node jobs

In [26]:
import ast

#df_jobs_f12 = pd.DataFrame(lst_jobs_f12)

#print(df_jobs_f12)

## do faster for single node jobs
df_jobs_f1_multi = df_jobs_f1[df_jobs_f1["num_nodes"] > 1].copy().reset_index(drop=True)

#removing malformed "nodes"
#df_jobs_f1_multi = df_jobs_f1_multi[df_jobs_f1_multi["nodes"].str.match("\[[^\]+]\]")].copy().reset_index(drop=True)
df_jobs_f1_multi["node"] = [ast.literal_eval(x) for x in df_jobs_f1_multi['nodes']]
#print(df_jobs_f1_multi)

if True:
    #use this for smaller inputs
    #sample = df_jobs_f1_multi[0:10].copy().reset_index(drop=True)

    sample = df_jobs_f1_multi

    group = df_power_p0_power.groupby(by="node")
    #group.get_group(519)["timestamp"]

    #result = sample.apply(lambda x: any(group.get_group(x["node"])["timestamp"].between(x["start_time"], x["end_time"])), axis=1)
    #print(result.values)
    #print(sample[["node", "start_time", "end_time"]].values.tolist())


    #for debugging
    #result = [group.get_group(x[0])["timestamp"].between(x[1], x[2]).value_counts() for x in sample[["node", "start_time", "end_time"]].values.tolist()]

    #i know that 0 is node, 1 is start_time, and 2 is end_time
    result = [all([any(group.get_group(y)["timestamp"].between(x[1], x[2])) for y in x[0]]) for x in sample[["node", "start_time", "end_time"]].values.tolist()]
    
    #result = [any(group.get_group(x[0])["timestamp"].between(x[1], x[2])) for x in sample[["node", "start_time", "end_time"]].values.tolist()]
    #print(result)
    
    sample["has_profile"] = result
    df_jobs_f12_multi = sample[sample["has_profile"]==True]
   


    perc_jobs_filtered = (len(df_jobs_f12_multi)/len(df_jobs_f1_multi))*100
    print(str(perc_jobs_filtered), "% of the original job table")

98.77255349117982 % of the original job table


In [None]:
import ast

if False:
    lst_jobs_f12_multi = []

    df_jobs_f1_multi = df_jobs_f1[df_jobs_f1["num_nodes"] > 1]

    #print(len(df_jobs_f1_multi))

    for index in range(len(df_jobs_f1_multi)): 
        #break   
        job=df_jobs_f1_multi.iloc[index,:]
        try:
            nodes=ast.literal_eval(job['nodes'])
        except ValueError:
            #print(job['nodes'])
            continue
        start_time=job["start_time"]
        end_time=job["end_time"]
        for node in nodes:
            #print(node)
            df_node = df_power_p0_power.loc[df_power_p0_power['node'] == node]
            df_node_job = df_node.loc[df_node['timestamp'].between(start_time, end_time)]
            #df_node_job = df_power_p0_power[(df_power_p0_power.loc[df_power_p0_power['node'] == node]) & (df_power_p0_power['timestamp'].between(start_time, end_time))]

            #np_timestamps = df_node['timestamp'].to_numpy()
            #np_node_job = np_timestamps[(np_timestamps >= start_time) & (np_timestamps <= end_time)]        
            
            #print(df_node_job)
            if not df_node_job.empty:
                lst_jobs_f12_multi.append(job)
                #print(df_node_job)
        print("Job "+str(index)+" of "+str(len(df_jobs_f1_multi)), end="\r")

    df_jobs_f12_multi = pd.DataFrame(lst_jobs_f12_multi)

    perc_jobs_filtered = (len(df_jobs_f12_multi)/len(df_jobs_f1_multi))*100
    print(str(perc_jobs_filtered), "% of the original job table")

## Saving intermediate results

In [27]:
#df_jobs_f12_single.to_csv("../Marconi-test/plugin=job_table/metric=job_info_marconi100/a_0_f12_singlenode.csv", index=False)
#df_jobs_f12_multi.to_csv("../Marconi-test/plugin=job_table/metric=job_info_marconi100/a_0_f12_multinode.csv", index=False)

# Visualizations

In [3]:
import pandas as pd

df_jobs_f12_viz = pd.read_csv("../Marconi-test/plugin=job_table/metric=job_info_marconi100/a_0_f12_singlenode.csv")

df_power_p0_power = pd.read_parquet("../Marconi-test/plugin=ipmi_pub/metric=total_power/a_0.parquet")
df_power_p0_power['node'] = pd.to_numeric(df_power_p0_power['node'])
df_power_p0_power['value'] = pd.to_numeric(df_power_p0_power['value'])

  df_jobs_f12_viz = pd.read_csv("../Marconi-test/plugin=job_table/metric=job_info_marconi100/a_0_f12_singlenode.csv")


## Raw jobs energy profile

In [5]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
import seaborn as sns
import matplotlib.pyplot as plt
import ast

#gfig = None

def call_workflow(index):
    get_job_energy_profile(index)

def get_job_energy_profile(index):
    global current_job_id, current_hostname
    annot_str=''
    job=df_jobs_f12_viz.iloc[index,:]
    nodes=ast.literal_eval(job['nodes'])
    start_time=job["start_time"]
    end_time=job["end_time"]
    for node in nodes:
        print(node)
        df_node = df_power_p0_power.loc[df_power_p0_power['node'] == node]
        df_node_job = df_node.loc[df_node['timestamp'].between(start_time, end_time)]
        if len(df_node_job) > 0:
            print(df_node_job.describe(), end="\r")
            plot_energy_profile(df_node_job, annot_str)
    print(nodes)
     
    #plot_energy_profile(job_energy_profile, annot_str)

    #current_job_id=str(job['job_id'])
    #current_hostname=energy_host
    ##
    #describe=job_energy_profile.describe(percentiles=[.10, .25, .5, .75, .90])
    #describe['job_id']=job['job_id']
    #describe['socket']=socket
    #describe['pp0']=describe[right_pp0]
    #describe['DRAM']=describe[right_DRAM]
    #describe['stat']=describe.index
    #describe=describe.reset_index(drop=True)[arr_cols]  
    #print(describe)  


def plot_energy_profile(profile, annot_str):
    TINY_SIZE = 2
    SMALL_SIZE = 5
    MEDIUM_SIZE = 25
    BIGGER_SIZE = 50
    FIG_WIDTH = 50
    FIG_HEIGHT = 20

    global gfig    

    plt.rc('font', size=BIGGER_SIZE)          # controls default text sizes
    plt.rc('axes', titlesize=BIGGER_SIZE)     # fontsize of the axes title
    plt.rc('axes', labelsize=BIGGER_SIZE)     # fontsize of the x and y labels
    plt.rc('xtick', labelsize=BIGGER_SIZE)    # fontsize of the tick labels
    plt.rc('ytick', labelsize=BIGGER_SIZE)    # fontsize of the tick labels
    plt.rc('legend', fontsize=BIGGER_SIZE)    # legend fontsize
    plt.rc('figure', titlesize=BIGGER_SIZE)  # fontsize of the figure title
    scatterplot_kwargs={'s': 50, 'palette': 'plasma'}
    lineplot_kwargs={'linewidth': 5}

    plt.clf()
    fig = plt.figure(figsize=(FIG_WIDTH,FIG_HEIGHT))
    #ax = sns.boxplot(x='stat', y='value', data=plot_data, showfliers=False, hue='reading',
    #             linewidth=TINY_SIZE)

    ax = sns.scatterplot(data=profile, x='timestamp', y='value', **scatterplot_kwargs)
    #ax = sns.lineplot(data=profile, x='timestamp', y='value', **lineplot_kwargs)

    ## SET BORDERS SIZE AND WIDTH
    [line.set_linewidth(TINY_SIZE) for line in ax.spines.values()]
    [line.set_markersize(TINY_SIZE) for line in ax.yaxis.get_ticklines()]
    [line.set_markeredgewidth(TINY_SIZE) for line in ax.yaxis.get_ticklines()]
    [line.set_markersize(SMALL_SIZE) for line in ax.xaxis.get_ticklines()]
    [line.set_markeredgewidth(TINY_SIZE) for line in ax.xaxis.get_ticklines()]
    #ax.text(x=0.1,y=0.5,
    #        s=annot_str,
    #        fontdict=dict(color='red',size=MEDIUM_SIZE),
    #        bbox=dict(facecolor='yellow',alpha=0.5),
    #        horizontalalignment='left',
    #        verticalalignment='center',
    #        transform=ax.transAxes)
    ax.set_ylabel('Processor Power (Watts)')
    ax.set_xlabel('Timestamp')
    gfig = fig

button = widgets.Button(description="Save to PDF")
output = widgets.Output()

display(button, output)

## TODO: Pass on b the data (job_id, hostname, etc)
def on_button_clicked(b):
    with output:
        #fig=plt.gcf()
        #print(gfig)
        fig_filename='../Figures/marconi100_interact_plot_'+current_hostname+'_'+current_job_id+'.pdf'
        gfig.savefig(fig_filename, format='pdf', dpi=300, bbox_inches='tight')
        print('Plot saved as '+fig_filename)

button.on_click(on_button_clicked)

interact(call_workflow, index=widgets.IntSlider(min=0, max=len(df_jobs_f12_viz)-1, step=1, value=0));

Button(description='Save to PDF', style=ButtonStyle())

Output()

interactive(children=(IntSlider(value=0, description='index', max=125365), Output()), _dom_classes=('widget-in…

## Notes

- It seems that whatever is reading the p0 power has an imprecision of about two watts. We notice "steps" of two watts for many jobs. We may need to aggregate to have a smoother time series

- Marconi100 information: https://wiki.u-gov.it/confluence/pages/viewpage.action?pageId=336727645
  - It considers node sharing