# Analysis of LSST Batch Activity at CC-IN2P3

*Source: [https://github.com/airnandez/alba](https://github.com/airnandez/alba)*

*Author: Fabio Hernandez, CC-IN2P3*

## Introduction

The purpose of this notebook is to analyse accounting information emitted by GridEngine about the activity of LSST batch jobs executed at CC-IN2P3.

In [1]:
import pathlib
import datetime
import sys
import re
import collections

import IPython.display
print_md = IPython.display.Markdown

import pandas as pd
import numpy as np
import bokeh
import bokeh.plotting as bkh
import bokeh.models as bkhmodels
bkh.output_notebook()

### Dependencies

This notebook uses the packages below listed below. The links point to their documentation:

* <a href="https://bokeh.pydata.org/en/latest/docs/reference.html" target="_blank">bokeh</a>
* <a href="https://docs.scipy.org/doc/" target="_blank">numpy</a>
* <a href="http://pandas.pydata.org/pandas-docs/stable/"  target="_blank">pandas</a>
* <a href="https://docs.python.org/3/library/index.html" target="_blank">python</a>

In [2]:
table = f"""
These are the versions of those dependencies you are currently using:

| Component | Version              |
| --------: | -------------------- |
|  bokeh    | {bokeh.__version__}  |
|  numpy    | {np.version.version} |
|  pandas   | {pd.__version__}     |
|  python   | {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro} |
"""
print_md(table)


These are the versions of those dependencies you are currently using:

| Component | Version              |
| --------: | -------------------- |
|  bokeh    | 1.0.1  |
|  numpy    | 1.14.2 |
|  pandas   | 0.23.4     |
|  python   | 3.6.8 |


## Load the accounting records into a Pandas dataframe

Set the data directory, where JSON accounting files are located. The repository of accounting data at CC-IN2P3 is `/sps/hep/ccin2p3/data/BatchFiles`. In that directory one can find one JSON file per month (e.g. `GE.accounting.2018.11.json`) which contains records of batch jobs of all groups using CC-IN2P3's batch service.

Records for jobs relevant for LSST can be found at CC-IN2P3 at `/sps/lsst/groups/accounting`. Those records are copied at the LSST Science Platform under `/path/to/location/at/ncsa`.

There is a JSON file per month (e.g. `GE.accounting.2018.11.lsst.json.gz`) which only contains accounting records for jobs submitted by members of the Unix group `lsst`:

In [3]:
# The locations below are searched for accounting data. The first location found is used.
search_paths = (
    # For development, we assume that data files are located in the './data' directory,
    # relative to the location of this notebook
    pathlib.Path.cwd().joinpath('data'),
    
    # Location at CC-IN2P3
    pathlib.Path('/sps/lsst/groups/accounting'),
    
    # Location at the LSST Science Platform at NCSA
    pathlib.Path('/lsst/jhome/fabioh/DATA/accounting'),
    
    # Location at NERSC
    pathlib.Path('/global/projecta/projectdirs/lsst/global/in2p3/accounting'),
)

data_dir = None
for p in search_paths:
    if p.exists():
        data_dir = p
        break
        
if not data_dir:
    raise ValueError("Could not find the location of the accounting data")

d = data_dir if data_dir != pathlib.Path.cwd().joinpath('data') else './data'
print_md(f'Data directory: `{d}`')

Data directory: `./data`

Import all the files available in the data directory:

In [4]:
# To analyse ALL the accounting files in the directory use
#    pattern = 'GE.accounting.*.json*'
# To analyse data of all months for a single year use:
#    pattern = 'GE.accounting.2018.*.json*'
# To analyse data of some months of a given year use:
#   pattern = 'GE.accounting.2018.1[0-1]*.json*'
pattern = 'GE.accounting.2018.*.json*'

df = pd.DataFrame()
for path in data_dir.glob(pattern):
    print(f'Loading data file {pathlib.PurePosixPath(path).name}')
    df = df.append(pd.read_json(path, lines=True))
    
rows = df.shape[0]
if rows == 0:
    raise Exception("no job records could be loaded")
    
print_md(f'Loaded **{rows}** job accounting records')

Loading data file GE.accounting.2018.01.lsst.json.gz
Loading data file GE.accounting.2018.02.lsst.json.gz
Loading data file GE.accounting.2018.03.lsst.json.gz
Loading data file GE.accounting.2018.04.lsst.json.gz
Loading data file GE.accounting.2018.05.lsst.json.gz
Loading data file GE.accounting.2018.06.lsst.json.gz
Loading data file GE.accounting.2018.07.lsst.json.gz
Loading data file GE.accounting.2018.08.lsst.json.gz
Loading data file GE.accounting.2018.09.lsst.json.gz
Loading data file GE.accounting.2018.10.lsst.json.gz
Loading data file GE.accounting.2018.11.lsst.json.gz
Loading data file GE.accounting.2018.12.lsst.json.gz


Loaded **279613** job accounting records

### Dataframe column renaming

In this section we modify the dataframe to rename some columns so to make their contents explicit and intuitive in the remaining of this notebook, if needed.

In [5]:
if False:
    df.rename(inplace=True, columns={
        # Modify column names as below
        'old_name':  'new_name',

        # Rename some fields to make explicit it is an extension to the raw data
        'x_worker_might_sectohs06sec': 'x_worker_hs06sec_factor',
    })

### Dataframe contents overview

Each row in the dataframe contains information about a single job. This information is emitted by GridEngine, preprocessed, extended and reformatted in JSON to make it easier to analyse (see [here](https://gitlab.in2p3.fr/Batch-tools/GE-batch-system)).

Below is the list of columns in the dataframe, shown in alphabetical order. The columns prefixed by `x_` (e.g. `x_cputime_hs06sec`) are extensions to the data provided by GridEngine.

In [6]:
num_rows, num_columns = df.shape
columns = sorted(list(df))
print_md(f'Dataframe size: **{num_rows} rows, {num_columns} columns:**\n\n `{", ".join(columns)}`')

Dataframe size: **279613 rows, 70 columns:**

 `account, ar_submission_time, arid, category, cpu, cwd, department, end_time, exit_status, failed, granted_pe, group, hostname, io, ioops, iow, job_class, job_name, job_number, maxpss, maxrss, maxvmem, mem, owner, priority, project, qdel_info   , qname, ru_idrss, ru_inblock, ru_ismrss, ru_isrss, ru_ixrss, ru_majflt, ru_maxrss, ru_minflt, ru_msgrcv, ru_msgsnd, ru_nivcsw, ru_nsignals, ru_nswap, ru_nvcsw, ru_oublock, ru_stime, ru_utime, ru_wallclock, slots, start_time, submission_time, submit_cmd, submit_host, task_number, taskid, wallclock, x_cpuefficiency_percore, x_cputime_hs06sec, x_cputime_sec, x_limits_s_cpu, x_limits_s_rss_gb, x_limits_s_vmem_gb, x_mem_max_pss_gb, x_mem_max_rss_gb, x_mem_max_vmem_gb, x_mem_ru_maxrss_gb, x_scheduler, x_worker_might_sectohs06sec, x_worker_model_type, x_worker_physical_virtual, x_worker_processors_thread_count, x_worker_product_name`

The semantics of each column is described in the table below:

| Column                         | Semantics                                                                                    |
|:-------------------------------|:---------------------------------------------------------------------------------------------|
| **`account`**                  | account string associated to the job, specified at submission time via `qsub(1)`, see `accounting(5)` |
| **`ar_submission_time`**       | advance reservation submission time, see `accounting(5)` |
| **`arid`**                     | advance reservation identifier, see `accounting(5)` |
| **`category`**                 | a string specifying the job category, see `accounting(5)` |
| **`cpu`**                      | number of **scaled CPU seconds (integrated over all the CPU slots allocated for this job)** this job was in executing state. Note that the scaling factor is specific for each worker node and can be retrieved via `qconf -se <worker hostname>` (field `usage_scaling`). The value of the scaling factor is available in column `x_worker_hs06sec_factor`. The number of slots allocated for this job is available in the column `slots` (see below). See `accounting(5)`
| **`cwd`**                      | working  directory the job ran in,  see `accounting(5)` |
| **`department`**               | the department which was assigned to the job, see `accounting(5)` |
| **`exit_status`**              | exit status of the job script, see `accounting(5)` |
| **`failed`**                   | indicates the problem which occurred in case a job could not be started, see `accounting(5)` |
| **`granted_pe`**               | parallel environment which was selected for that job, see `accounting(5)` |
| **`group`**                    | effective group id of the job owner when executing the job, see `accounting(5)` |
| **`hostname`**                 | name of the execution host, see `accounting(5)` |
| **`io`**                       | amount of read and written by the job, in GBytes,  see `accounting(5)` |
| **`ioops`**                    | number of io operations, see `accounting(5)` |
| **`iow`**                      | I/O wait time in seconds, see `accounting(5)` |
| **`job_class`**                | if the job has been running in a job class, the name of the job class, see `accounting(5)` |
| **`job_name`**                 | job name (or `QLOGIN` if this is an interactive job), see `accounting(5)` |
| **`job_number`**               | job identifier, see `accounting(5)` |
| **`maxpss_GB`**                | maximum proportional set size in GBytes, see `accounting(5)` |
| **`maxrss_GB`**                | maximum resident set size in GBytes, see `accounting(5)` |
| **`maxvmem_GB`**               | the maximum vmem size in GBytes, see `accounting(5)` |
| **`mem`**                      | integral memory usage in GBytes CPU seconds, see `accounting(5)` |
| **`slots`**                    | number of (job) slots which were dispatched to the job by the scheduler, see `accounting(5)` |
| **`owner`**                    | user name who submitted the job, see `accounting(5)` |
| **`pe_taskid`**                | if this identifier is set the task was part of a parallel job, see `accounting(5)` |
| **`priority`**                 | priority value assigned to the job, see `accounting(5)` |
| **`project`**                  | the project which was assigned to the job, see `accounting(5)` |
| **`qdel_info`**                | if the job has been deleted via `qdel`, username@hostname, else `NONE`, see `accounting(5)` |
| **`queue_name`**               | name of the cluster queue in which the job has run, see `qname` in `accounting(5)` |
| **`ru_*`**                     | standard UNIX rusage structure as described in `getrusage(2)`. Fields are: `ru_idrss`, `ru_inblock`, `ru_ismrss`, `ru_isrss`, `ru_ixrss`, `ru_majflt`, `ru_maxrss_GB`, `ru_minflt`, `ru_msgrcv`, `ru_msgsnd`, `ru_nicsw`, `ru_nsignals`, `ru_nswap`, `ru_nvcsw`, `ru_oublock`, `ru_stime`, `ru_utime`. See `accounting(5)` |
| **`ru_wallclock_sec`**         | wallclock execution time of the job, in seconds, see `accounting(5)` |
| **`submit_cmd`**               | the command line used for job submission, see `accounting(5)` |
| **`submit_host`**              | submit host name, see `accounting(5)` |
| **`task_number`**              | array job task index number, see `accounting(5)` |
| **`wallclock`**                | wallclock time this job spent in running state. Unit is seconds. See `accounting(5)` |
| |
|  **EXTENDED COLUMN**            | **SEMANTICS** |
| **`x_cputime_hs06sec`**         | number of normalized seconds this job was in execution state, integrated over all the slots allocated for this job (see `slots`). The unit is HS06.sec |
| **`x_cputime_sec`**             | CPU time (in seconds) consumed by this job, integrated over all the slots allocated to the job (see `slots`). **Warning**: this CPU time is not normalized. See also `x_cputime_hs06sec` |
| **`x_limits_s_cpu`**            | ??? |
| **`x_limits_s_rss_gb`**         | ??? |
| **`x_limits_s_vmem_gb`**        | ??? |
| **`x_limits_s_vmem_gb`**        | ??? |
| **`x_mem_max_pss_gb`**          | maximum proportional set size in GBytes, see `accounting(5)` and `getrusage(2)` |
| **`x_mem_max_rss_gb`**          | maximum resident set size in GBytes, see `accounting(5)` and `getrusage(2)` |
| **`x_mem_max_vmem_gb`**         | the maximum vmem size in GBytes, see `accounting(5)` and `getrusage(2)` |
| **`x_mem_ru_maxrss_gb`**        | ??? |
| **`x_scheduler`**               | Identifier of the batch job scheduler (typically 'UGE') |
| **`x_worker_might_sectohs06sec`**   | CPU scaling factor for the worker node this job executed on. This factor is used to compute the execution time of this job in normalized units, namely in HS06.sec. |
| **`x_worker_model_type`**       | CPU model of the worker node where the job executed, e.g. 'Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz' |
| **`x_worker_physical_virtual`** | is the worker node a 'physical' or 'virtual' machine? |
| **`x_worker_processors_thread_count`**        | number of (maybe virtual) CPU cores in this worker node |
| **`x_worker_product_name`**     | worker node product name, as assigned by vendor e.g. 'PowerEdge C6220 II' |

In [7]:
df.head()

Unnamed: 0,account,ar_submission_time,arid,category,cpu,cwd,department,end_time,exit_status,failed,...,x_mem_max_pss_gb,x_mem_max_rss_gb,x_mem_max_vmem_gb,x_mem_ru_maxrss_gb,x_scheduler,x_worker_might_sectohs06sec,x_worker_model_type,x_worker_physical_virtual,x_worker_processors_thread_count,x_worker_product_name
0,sge,0,0,"-U LSST,demonqueue,gpuqueue,longlastinggpuqueu...",980966.628,NONE,defaultdepartment,2017-12-31 23:01:47,1,0,...,2.016441,5.231304,21.390625,0.588039,UGE,10.43,Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz,physical,40,PowerEdge C6220 II
1,sge,0,0,"-U LSST,demonqueue,gpuqueue,longlastinggpuqueu...",745324.376,NONE,defaultdepartment,2017-12-31 23:02:13,1,0,...,1.628927,5.221485,23.656815,0.584755,UGE,11.1,Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz,physical,48,PowerEdge C6320
2,sge,0,0,"-U LSST,demonqueue,gpuqueue,longlastinggpuqueu...",791662.382,NONE,defaultdepartment,2017-12-31 23:02:13,1,0,...,3.545083,5.172054,19.32045,0.579376,UGE,0.0,,,0,
3,sge,0,0,"-U LSST,demonqueue,gpuqueue,longlastinggpuqueu...",744737.809,NONE,defaultdepartment,2017-12-31 23:02:22,1,0,...,1.543235,5.241722,23.670475,0.590885,UGE,10.87,Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz,physical,48,PowerEdge C6320
4,sge,0,0,"-U LSST,demonqueue,gpuqueue,longlastinggpuqueu...",785981.591,NONE,defaultdepartment,2017-12-31 23:04:01,1,0,...,2.528771,5.143364,23.63821,0.578518,UGE,11.1,Intel(R) Xeon(R) CPU E5-2680 v3 @ 2.50GHz,physical,48,PowerEdge C6320


### Dataframe extension

Extend the dataframe by computing convenient values and make sure the columns referring to a timestamp are interpreted by Pandas as a timestamp:

In [8]:
# Make sure columns 'submission_time', 'start_time' and 'end_time' are timestamps
for col in ('submission_time', 'start_time', 'end_time'):
    df[col] = df[col].astype('datetime64[s]')

# Compute a new column which contains the waiting time in queue for each job
# We prefix each new column name with '_' to visually distinguish it from the columns in the
# original data files
df['_waiting_time_sec'] = (df['start_time'] - df['submission_time']).apply(lambda d: d.total_seconds())

# Modify the CPU model to remove '(R)'
df['x_worker_model_type'] = df['x_worker_model_type'].str.replace('(R)', '', regex=False)

### Filter dataframe contents

Only consider jobs by owners and groups of interest. At CC-IN2P3, jobs for the organized data processing activities of LSST and DESC are executed as users `lsstprod` and `descprod`, respectively. The accounts for members of LSST/DESC belong to the group `lsst`.

In [9]:
# Select the users and groups of interest by specifying them in a list. Use an empty list
# if no selection is needed
users = ('descprod',)
groups = ('lsst',)

if users:
    df = df[df['owner'].isin(users)]

if groups:
    df = df[df['group'].isin(groups)]
    
def user_filter_description(users, groups):
    res = '(all users' if not users else f"(users: {','.join(users)}"
    res += ', all groups)' if not groups else f", groups: {','.join(groups)})"
    return res

print_md(f'Number of jobs for {user_filter_description(users, groups)}: **{df.shape[0]}**')

Number of jobs for (users: descprod, groups: lsst): **93944**

Select the time interval of jobs of interest. We only consider jobs submitted within that interval.

In [10]:
def filter_by_date(dframe, start=None, end=None):
    """Select the job records in the dataframe 'dframe' which submission time is between 'start' and 'end'
    """
    if start:
        dframe = dframe[dframe['submission_time'] >= start]
    if end:
        dframe = dframe[dframe['submission_time'] <= end]
    return dframe


# Use a specific date such as
#    datetime.datetime(2018, 10, 25)
# to specify the start and end dates of interest. Alternatively, use None for processing
# all the available job records.
start_date = None
end_date = None
df = filter_by_date(df, start_date, end_date)
print_md(f'Number of job records in the selected period: **{df.shape[0]}**')

Number of job records in the selected period: **93944**

### Helper functions and variables

In [11]:
def format_pct_hours(v):
    """Format v to be displayed as a percentile, in hours. Argument v is expected to be seconds.
    """
    return "<1" if v < 3600.0 else f'{int(v/3600.0)}'

def format_pct_min(v):
    """Format v to be displayed as a percentile, in minutes. Argument v is expected to be seconds.
    """
    return "<1" if v < 60.0 else f'{int(v/60.0)}'

def style_figure(fig, x_font_style='bold', x_label_standoff=15, y_font_style='bold', y_label_standoff=15):
    """Style some attributes of a Bokeh figure
    """
    # Figure size (in screen units)
    fig.plot_width = 800
    fig.plot_height = 600
    
    # Autohide figure toolbar
    fig.toolbar.autohide = True

    # Figure background fill color and alpha
    fig.background_fill_color = 'whitesmoke'
    fig.background_fill_alpha = 0.8
    
    # Axis properties
    fig.xaxis.axis_label_text_font_style = x_font_style
    fig.xaxis.axis_label_standoff =  x_label_standoff
    fig.yaxis.axis_label_text_font_style = y_font_style
    fig.yaxis.axis_label_standoff =  y_label_standoff
    
# Definition of the batch queues
batch_queues = {
    # Interactive queues
    'interactive': ('interactive', 'mc_interactive'),
    
    # Queues which accept single-node jobs (either single CPU core or multi-CPU core)
    'single-core': ('huge', 'long', 'longlasting'),
    'multi-core':  ('mc_highmem_huge', 'mc_highmem_long', 'mc_huge', 'mc_long', 'mc_longlasting'),
    
    # Queues which accept multi-node jobs
    'multi-node':  ('pa_gpu_long', 'pa_long', 'pa_longlasting', 'pa_medium'),
    'gpu':         ('mc_gpu_interactive', 'mc_gpu_long', 'mc_gpu_longlasting', 'mc_gpu_medium'),
}

---
## Batch system service

In this section we analyse the behavior of jobs and the batch system in general terms.

### Activity overview

In this sub-section we present the periods of batch activity

In [12]:
start_period, end_period = df['submission_time'].min(), df['submission_time'].max()
num_jobs = df.shape[0]
print_md(f'From {start_period.date()} to {end_period.date()} there were {num_jobs} jobs submitted by {user_filter_description(users, groups)}')

From 2018-01-09 to 2018-12-16 there were 93944 jobs submitted by (users: descprod, groups: lsst)

In [13]:
# Count submitted and terminated jobs per day
df['_submission_date'] = df['submission_time'].apply(lambda dt: dt.date())
submitted = df.groupby(['_submission_date']).size()

df['_end_date'] = df['end_time'].apply(lambda dt: dt.date())
terminated = df.groupby(['_end_date']).size()

fig = bkh.figure(
    title = f'BATCH ACTIVITY OVERVIEW {user_filter_description(users, groups)}',
    x_axis_label = 'date',
    x_axis_type = 'datetime',
    y_axis_label = 'job count'
)
style_figure(fig)

color = 'steelblue'
fig.circle(x=submitted.index, y=submitted.values, fill_color=color, size=8, legend="submitted")
fig.line(x=submitted.index, y=submitted.values, line_color=color, legend="submitted")

color = 'red'
fig.circle(x=terminated.index, y=terminated.values, fill_color=color, size=8, legend="terminated")
fig.line(x=terminated.index, y=terminated.values, line_color=color, legend="terminated")

fig.xaxis.formatter = bkhmodels.formatters.DatetimeTickFormatter(
    days=["%Y-%m-%d"], months=["%Y-%m-%d"], years=["%Y-%m-%d"]
)
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('date', '@x{%F}'),
        ('jobs', '@y{0,000}'),
    ],
    formatters = {
        'x': 'datetime',
    },
    mode = 'mouse',
))
fig.legend.location = 'top_left'
fig.legend.click_policy = 'hide'
bkh.show(fig)
print_md(f"""*The figure above shows the number of jobs submitted and terminated per day.*""")

*The figure above shows the number of jobs submitted and terminated per day.*

### Execution queues

Show the activity per batch queue. Each queue has its own characteristics in terms of CPU time, memory size, etc. You can see the details of each queue [here](http://cctools.in2p3.fr/mrtguser/info_sge_queue.php).

In [14]:
queues = df.groupby(['qname']).size().sort_values(ascending=False)
labels = [n for n in queues.index]
values = queues.get_values()

fig = bkh.figure(
    title = f'NUMBER OF JOBS PER EXECUTION QUEUE {user_filter_description(users, groups)}',
    x_axis_label = 'queue',
    y_axis_label = 'job count',
    x_range = labels
)
style_figure(fig)
color, alpha = 'darkorange', 0.7
fig.vbar(x=labels, top=values, width=0.7, alpha=alpha, fill_color=color, line_color=color)
fig.xgrid.grid_line_color = None
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('queue', '@x'),
        ('jobs', '@top{0,000}'),
    ],
    mode = 'mouse',
))
bkh.show(fig)
print_md(f"""*The figure above shows the number of jobs executed per batch system queue. Only queues
    where at least one job was put for execution are shown.*""")

*The figure above shows the number of jobs executed per batch system queue. Only queues
    where at least one job was put for execution are shown.*

### Successful and failed jobs

Compute the percentiles of job which succeeded and failed. A job is considered failes if its exit status is not zero.

In [15]:
def format_failed_pct(v):
    return '{:.0%}'.format(v) if v > 0.0 else 'n/a'

def format_job_count(count):
    return '{:,}'.format(count)

d = {}
queues = df.groupby(['qname']).size().sort_values(ascending=False)
for q in queues.index:
    job_count = queues[q]
    succeeded = df.loc[(df['qname'] == q) & (df['exit_status'] == 0)].shape[0]
    d[q] = {
        'job_count': job_count,
        'succeeded': succeeded/job_count if job_count > 0 else 0,
        'failed':    (job_count - succeeded)/job_count if job_count > 0 else 0,
    }

table = """
### Percentage of failed jobs (`exit_status != 0`)

| Queue | Job Count | Succeeded | Failed |
| ----: | --------: | --------: | -----: |
"""

for k, v in d.items():
    table += f'| **{k}** | {format_job_count(v["job_count"])} | {format_failed_pct(v["succeeded"])} | {format_failed_pct(v["failed"])} |\n'

table += """
*The table above shows the fraction of succeeded and failed jobs per queue. A job is considered failed if its `exit_status` is not zero.*
"""    
print_md(table)


### Percentage of failed jobs (`exit_status != 0`)

| Queue | Job Count | Succeeded | Failed |
| ----: | --------: | --------: | -----: |
| **long** | 39,812 | 99% | 1% |
| **mc_long** | 24,845 | 100% | 0% |
| **huge** | 21,004 | 93% | 7% |
| **mc_huge** | 8,282 | 97% | 3% |
| **interactive** | 1 | n/a | 100% |

*The table above shows the fraction of succeeded and failed jobs per queue. A job is considered failed if its `exit_status` is not zero.*


### Waiting time

Show the distribution of the time the jobs spent waiting in the queue before being put in execution in a compute node.

In [16]:
fig = bkh.figure(
    title = f'DISTRIBUTION OF JOB WAITING TIME IN QUEUE {user_filter_description(users, groups)}',
    x_axis_label = 'waiting time (min)',
    y_axis_label = 'job count',
)
style_figure(fig)
color, alpha = 'firebrick', 0.7
hist, edges = np.histogram(df['_waiting_time_sec']/60, density=False, bins=30)
fig.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], alpha=alpha, fill_color=color, line_color=color)
fig.xaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('jobs', '@top{0,000}'),
        ('low',  '@left{0.0} min'),
        ('high', '@right{0.0} min'),
    ],
    mode = 'mouse',
))
bkh.show(fig)
print_md(f"""*The figure above shows the distribution of the time the jobs waited in queue before
    being put in execution by the job scheduler.*""")

*The figure above shows the distribution of the time the jobs waited in queue before
    being put in execution by the job scheduler.*

In [17]:
d = {}
queues = df.groupby(['qname']).size().sort_values(ascending=False)
for q in queues.index:
    selection = df.loc[df['qname'] == q]
    d[q] = {
        'job_count': selection.shape[0],
        'pcts': np.percentile(selection['_waiting_time_sec'], [50, 80, 98, 100])
    }

table = """
### Waiting time percentiles (*minutes*)

| Queue | Job Count | 50pct | 80pct | 98pct | 100pct |
| ----: | --------: | ----: | ----: | ----: | -----: |
"""

for k, v in d.items():
    table += f'| **{k}** | {format_job_count(v["job_count"])} | {format_pct_min(v["pcts"][0])} | {format_pct_min(v["pcts"][1])} | {format_pct_min(v["pcts"][2])} | {format_pct_min(v["pcts"][3])} |\n'    

table += """
*The table above shows some percentiles for waiting time for each execution queue (unit is minutes).*
"""    
print_md(table)


### Waiting time percentiles (*minutes*)

| Queue | Job Count | 50pct | 80pct | 98pct | 100pct |
| ----: | --------: | ----: | ----: | ----: | -----: |
| **long** | 39,812 | <1 | 14 | 193 | 527 |
| **mc_long** | 24,845 | 62 | 88 | 2172 | 4205 |
| **huge** | 21,004 | 1 | 8 | 57 | 517 |
| **mc_huge** | 8,282 | 159 | 429 | 3223 | 8849 |
| **interactive** | 1 | <1 | <1 | <1 | <1 |

*The table above shows some percentiles for waiting time for each execution queue (unit is minutes).*


### Execution time

Show the distribution of the wallclock time the jobs spent in execution.

In [18]:
fig = bkh.figure(
    title = f'DISTRIBUTION OF JOB EXECUTION WALLCLOCK TIME {user_filter_description(users, groups)}',
    x_axis_label = 'execution wallclock time (hours)',
    y_axis_label = 'job count'
)
style_figure(fig)
color, alpha = 'darkseagreen', 0.7
hist, edges = np.histogram(df['wallclock']/3600, density=False, bins=40)
fig.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], alpha=alpha, fill_color=color, line_color=color)
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('jobs', '@top{0,000}'),
        ('low',  '@left{0.0} hours'),
        ('high', '@right{0.0} hours'),
    ],
    mode = 'mouse',
))
bkh.show(fig)

In [19]:
d = {}
queues = df.groupby(['qname']).size().sort_values(ascending=False)
for q in queues.index:
    selection = df.loc[df['qname'] == q]
    d[q] = {
        'job_count': selection.shape[0],
        'pcts': np.percentile(selection['wallclock'], [50, 80, 98, 100])
    }

table = """
### Execution time percentiles (*hours*)

| Queue | Job Count | 50pct | 80pct | 98pct | 100pct |
| ----: | --------: | ----: | ----: | ----: | -----: |
"""

for k, v in d.items():
    table += f'| **{k}** | {format_job_count(v["job_count"])} | {format_pct_hours(v["pcts"][0])} | {format_pct_hours(v["pcts"][1])} | {format_pct_hours(v["pcts"][2])} | {format_pct_hours(v["pcts"][3])} | \n'

table += """
*The table above shows some percentiles for job execution wallclock time per execution queue (unit is hours)*
"""
print_md(table)


### Execution time percentiles (*hours*)

| Queue | Job Count | 50pct | 80pct | 98pct | 100pct |
| ----: | --------: | ----: | ----: | ----: | -----: |
| **long** | 39,812 | <1 | 5 | 23 | 58 | 
| **mc_long** | 24,845 | <1 | <1 | 2 | 19 | 
| **huge** | 21,004 | <1 | <1 | 33 | 72 | 
| **mc_huge** | 8,282 | <1 | 1 | 70 | 86 | 
| **interactive** | 1 | <1 | <1 | <1 | <1 | 

*The table above shows some percentiles for job execution wallclock time per execution queue (unit is hours)*


### CPU efficiency

In this section we compute the CPU efficiency of the jobs. CPU efficiency is the ratio CPU time to wallclock time, that is, the fraction of the wallclock time the job actually used the CPU.

In [20]:
# Compute the non-normalized CPU time for each job slot for jobs satisfying the following conditions:
#    1) The exit status of the jobs is zero
#    2) The job executed in a single node batch queue, ie. do not include jobs executed in the 'interactive' queue
#    3) The job spent time in execution at least equivalent to the 5th percentile of the jobs satistying
#       the two previous conditions
queues_to_include = batch_queues['single-core'] + batch_queues['multi-core']
job_set = df.loc[(df['exit_status'] == 0) & (df['qname'].isin(queues_to_include))]
wallclock_threshold = np.percentile(job_set['wallclock'], [5])[0]
job_set = job_set.loc[job_set['wallclock'] >= wallclock_threshold]

# Compute the CPU efficiency
cputime_per_slot = job_set['cpu'] / (job_set['x_worker_might_sectohs06sec'] * job_set['slots'])
cpueff = cputime_per_slot / job_set['wallclock']

hist, edges = np.histogram(cpueff, density=False, bins=40)
fig = bkh.figure(
    title = f'DISTRIBUTION OF CPU EFFICIENCY {user_filter_description(users, groups)}',
    x_axis_label = 'CPU efficiency (CPU time / wallclock time)',
    y_axis_label = 'job count'
)
style_figure(fig)
color, alpha = 'saddlebrown', 0.7
fig.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], alpha=alpha, fill_color=color, line_color=color)
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('jobs', '@top{0,000}'),
        ('low',  '@left{0%}'),
        ('high', '@right{0%}'),
    ],
    mode = 'mouse',
))
bkh.show(fig)
print_md(f"""*The figure above shows the distribution of the time the CPU efficiency of selected jobs. Note that there are some jobs which CPU efficiency is greater than 1.0: this is likely
due to some measurement error of the actual CPU time used by the job as collected by GridEngine or an under-estimation of the number of CPU cores actually used by the job*""")

*The figure above shows the distribution of the time the CPU efficiency of selected jobs. Note that there are some jobs which CPU efficiency is greater than 1.0: this is likely
due to some measurement error of the actual CPU time used by the job as collected by GridEngine or an under-estimation of the number of CPU cores actually used by the job*

### CPU cores per job

Show the number of job slots that the batch system allocated for each job. This number is a proxy of the number of CPU cores the job requested

In [21]:
fig = bkh.figure(
    title = f'DISTRIBUTION OF JOB SLOTS {user_filter_description(users, groups)}',
    x_axis_label = 'number of slots',
    y_axis_label = 'job count'
)
style_figure(fig)
color, alpha = 'lightseagreen', 0.7
hist, edges = np.histogram(df['slots'], density=False, bins=10)
fig.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], alpha=alpha, fill_color=color, line_color=color)
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('jobs', '@top{0,000}'),
        ('low',  '@left{0.0} slots'),
        ('high', '@right{0.0} slots'),
    ],
    mode = 'mouse',
))
bkh.show(fig)

### Memory utilization

In this section we plot the distribution of the usage of RAM by the jobs.

In [22]:
# Only consider successful jobs executed in single- and multi-core queues
queues_to_include = batch_queues['single-core'] + batch_queues['multi-core']
successful_jobs = df.loc[(df['exit_status'] == 0) & (df['qname'].isin(queues_to_include))]

# Create the figure
fig = bkh.figure(
    title = f'DISTRIBUTION OF MEMORY UTILIZATION PER BATCH QUEUE {user_filter_description(users, groups)}',
    x_axis_label = 'gigabyte',
    y_axis_label = 'job count'
)
style_figure(fig)

# Build a histogram per queue
colors = ('dodgerblue', 'darkmagenta', 'mediumseagreen', 'orange', 'crimson', 'sienna')
queues = successful_jobs.groupby(['qname']).size().sort_values(ascending=False)
for q, color in zip(queues.index, colors):
    # Select the jobs in this queue which wallclock time was higher than the 5th percentile,
    # to exclude jobs which may have terminated early
    job_set = successful_jobs.loc[successful_jobs['qname'] == q]
    wallclock_threshold = np.percentile(job_set['wallclock'], [5])[0]
    memory = job_set.loc[job_set['wallclock'] >= wallclock_threshold, 'x_mem_max_rss_gb']
    hist, edges = np.histogram(memory, density=False, bins=20)
    fig.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], alpha= 0.5, fill_color=color, line_color=color, legend=q)

# Finalize the figure
fig.legend.click_policy = 'hide'
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('jobs', '@top{0,000}'),
        ('low',  '@left{0.0} GB'),
        ('high', '@right{0.0} GB'),
    ],
    mode = 'mouse',
))
bkh.show(fig)
print_md(f"""*The figure above shows the distribution of RAM the jobs actually used in execution (in gigabyte), as collected by GridEngine. Click on the legend to hide / show information.*""")

*The figure above shows the distribution of RAM the jobs actually used in execution (in gigabyte), as collected by GridEngine. Click on the legend to hide / show information.*

In [23]:
def format_mem_pct(v):
    return '<1' if v < 1.0 else f'{int(v)}'

queues = df.groupby(['qname']).size().sort_values(ascending=False)
d = {}
for q in queues.index:
    selection = df[df['qname'] == q]
    d[q] = {
        'job_count': selection.shape[0],
        'pcts': np.percentile(selection['x_mem_max_rss_gb'], [50, 80, 98, 100])
    }

table = """
### Memory usage percentiles (gigabytes)

| Queue | Job Count | 50pct | 80pct | 98pct | 100pct |
| ----: | --------: | ----: | ----: | ----: | -----: |
"""

for k, v in d.items():
    table += f'| {k} | {format_job_count(v["job_count"])} | {format_mem_pct(v["pcts"][0])} | {format_mem_pct(v["pcts"][1])} | {format_mem_pct(v["pcts"][2])} | {format_mem_pct(v["pcts"][3])} | \n'

table += """
*The table above shows some percentiles of memory consumption **per job** for each execution queue (unit is GB).*
"""    
print_md(table)


### Memory usage percentiles (gigabytes)

| Queue | Job Count | 50pct | 80pct | 98pct | 100pct |
| ----: | --------: | ----: | ----: | ----: | -----: |
| long | 39,812 | <1 | <1 | 2 | 8 | 
| mc_long | 24,845 | 8 | 9 | 13 | 31 | 
| huge | 21,004 | <1 | 1 | 2 | 12 | 
| mc_huge | 8,282 | 7 | 20 | 52 | 95 | 
| interactive | 1 | <1 | <1 | <1 | <1 | 

*The table above shows some percentiles of memory consumption **per job** for each execution queue (unit is GB).*


## Estimation of required number of CPU cores

In this section we estimate the number of CPU cores that would be needed to perform the same activity over the considered period. We consider the most powerful CPU available in the farm.

In [24]:
# Get the value of the maximum CPU time conversion factor
max_might = df['x_worker_might_sectohs06sec'].max()

# Get the model name of a CPU with the max conversion factor (i.e. the most powerful CPU model used for the set of jobs considered)
cpu_model = df.loc[df['x_worker_might_sectohs06sec'] == max_might, 'x_worker_model_type'].unique()[0]

# Get the total amount of normalized CPU time consumed by all the relevant jobs
total_consumed_cputime_hs06sec = df['x_cputime_hs06sec'].sum()

# Compute the number of elapsed (wallclock) seconds in the period
start_period = df['submission_time'].min()
end_period = df['end_time'].max()
elapsed_days = (end_period - start_period).days
elapsed_secs = (end_period - start_period).total_seconds()

# The number of CPU cores (of the most powerful model we have) to perform the same activity
# over the same period of time is computed by dividing the normalized CPU time consumed by
# all the relevant jobs by the scale factor for the most powerful CPU model and dividing
# this by the number of elapsed seconds in the period.
# We assume a CPU efficiency < 1.0 to be more realistic
estimated_cpu_efficiency = 0.6
required_cpu_cores = total_consumed_cputime_hs06sec / (max_might * estimated_cpu_efficiency) / elapsed_secs
print_md(f'The number of cores of **{cpu_model}** required to deliver the same computing capacity over {elapsed_days} days is **{int(required_cpu_cores)}**')

The number of cores of **Intel Xeon Silver 4114 CPU @ 2.20GHz** required to deliver the same computing capacity over 341 days is **96**

# DESC DC2 Run1.2p jobs

In this section we focus on the jobs for processing the DESC DC2 Run1.2p. All the jobs of that exercise were executed by used  `descprod` and the last iteration started by 2018-10-01.

## Select the jobs of interest

In [25]:
# Select the period of interest
start_period = datetime.datetime(2018, 10, 1)
end_period = datetime.datetime(2018, 12, 31)

# DESC jobs run as user 'descprod'
dc2_jobs = filter_by_date(df, start=start_period, end=end_period)
dc2_jobs = dc2_jobs[dc2_jobs['owner'] == 'descprod']

# Add a label and a period for annotating the plots
dc2_label = 'DESC DC2 Run1.2p'
dc2_period = f'{start_period.strftime("%Y-%m-%d")} to {end_period.strftime("%Y-%m-%d")}'

## Label jobs according to their role in the pipeline

First, we label the jobs according to their purpose, using the submission command which contains some clues on the type of job. Note that there are some jobs which contain the type of the job in the `account` field, but this was implemented late in the process, so not all jobs were labeled this way.

We add a column which contains the kind of job, named `_job_category`.

In [26]:
dc2_jobs['_job_category'] = ''

patterns = {
    'ingest_flat':             re.compile(r'.*/ingest_flat/.*'),
    'ingest_bias':             re.compile(r'.*/ingest_bias/.*'),
    'ingest_dark':             re.compile(r'.*/ingest_dark/.*'),
    'ingestRefCat':            re.compile(r'.*/ingestRefCat/.*'),
    'ingestData':              re.compile(r'.*/ingestData/.*'),
    'setup_ingest':            re.compile(r'.*/setup_ingest/.*'),
    'run_ingestData':          re.compile(r'.*/run_ingestData/.*'),

    'setup_calib':             re.compile(r'.*/setup_calib/.*'),
    'run_bias':                re.compile(r'.*/run_bias/.*'),
    'run_flat':                re.compile(r'.*/run_flat/.*'),
    'run_dark':                re.compile(r'.*/run_dark/.*'),
    'setup_bias':              re.compile(r'.*/setup_bias/.*'),
    'setup_flat':              re.compile(r'.*/setup_flat/.*'),
    'setup_dark':              re.compile(r'.*/setup_dark/.*'),
    'finish_calib':            re.compile(r'.*/finish_calib/.*'),

    'setup_SingleFrameDriver': re.compile(r'.*/setup_SingleFrameDriver/.*'),
    'run_SingleFrameDriver':   re.compile(r'.*/run_SingleFrameDriver/.*'),
    'setup_coadd':             re.compile(r'.*/setup_coadd/.*'),
    'run_coaddDriver':         re.compile(r'.*/run_coaddDriver/.*'),
    'setup_multiBand':         re.compile(r'.*/setup_multiBand/.*'),
    'run_multiBandDriver':     re.compile(r'.*/run_multiBandDriver/.*'),
    'makeSkyMap':              re.compile(r'.*/makeSkyMap/.*'),
}

def get_job_type(submit_cmd):
    for cat, rex in patterns.items():
        if rex.search(submit_cmd):
            return cat
    return ''

dc2_jobs['_job_category'] = dc2_jobs['submit_cmd'].apply(lambda cmd: get_job_type(cmd))
unlabeled = dc2_jobs[dc2_jobs['_job_category'] == ''].shape[0]
print_md(f'There are {unlabeled} unlabeled jobs, out of {dc2_jobs.shape[0]} DC2 jobs')

There are 0 unlabeled jobs, out of 13519 DC2 jobs

## Number of jobs per category

In [27]:
job_categories = dc2_jobs.groupby('_job_category').size().sort_values(ascending=False)
labels = [n for n in job_categories.index]
values = job_categories.get_values()
fig = bkh.figure(
    title = f'{dc2_label} — NUMBER OF JOBS PER CATEGORY [{dc2_period}]',
    x_axis_label = 'job category',
    y_axis_label = 'job count',
    y_axis_type = 'log',
    x_range = labels
)
style_figure(fig)
color, alpha = 'orange', 0.8
fig.vbar(x=labels, top=values, bottom=0.1, width=0.6, alpha=alpha, fill_color=color, line_color=color)
fig.xgrid.grid_line_color = None
fig.xaxis.major_label_orientation = 0.9
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('job type', '@x'),
        ('jobs', '@top{0,000}'),
    ],
    mode = 'mouse',
))
bkh.show(fig)
print_md(f"""*The figure above shows the number of DC2 jobs per job category.
Hover the mouse over each vertical bar to get the exact number of jobs*""")

*The figure above shows the number of DC2 jobs per job category.
Hover the mouse over each vertical bar to get the exact number of jobs*

## CPU utilization per job category

In [28]:
# Get the value of the maximum CPU time conversion factor
max_might = df['x_worker_might_sectohs06sec'].max()

# Get the model name of a CPU with the max conversion factor (i.e. the most powerful CPU model used for the set of jobs considered)
cpu_model = df.loc[df['x_worker_might_sectohs06sec'] == max_might, 'x_worker_model_type'].unique()[0]

# Get the aggregated CPU time per job category and normalize it to the most powerful CPU used
cpu_time_per_category = dc2_jobs.groupby('_job_category')['x_cputime_hs06sec'].sum() / (max_might * 3600)
cpu_time_per_category = cpu_time_per_category.sort_values(ascending=False)
cpu_time_per_category = cpu_time_per_category[cpu_time_per_category >= 1.0]
labels = cpu_time_per_category.index.values
values = cpu_time_per_category.get_values()
fig = bkh.figure(
    title = f'{dc2_label} — DISTRIBUTION OF CPU UTILIZATION [{dc2_period}]',
    x_axis_label = 'job category',
    y_axis_label = f'Hours on {cpu_model}',
    y_axis_type = 'log',
    x_range = labels
)
style_figure(fig)
color, alpha = 'dodgerblue', 0.8
fig.vbar(x=labels, top=values, width=0.6, bottom=1, alpha=alpha, fill_color=color, line_color=color)
fig.xgrid.grid_line_color = None
fig.xaxis.major_label_orientation = 0.9
fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,0")
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('job type', '@x'),
        ('CPU time', '@top{0,0.000}'),
    ],
    mode = 'mouse',
))
bkh.show(fig)
print_md(f"""*The figure above shows the aggregated CPU time used by DC2 jobs per job category.
Only the job categories for which the CPU consumption is at least 1 hour are shown.
Hover the mouse over each vertical bar to get the CPU hours consumed by each category*""")

*The figure above shows the aggregated CPU time used by DC2 jobs per job category.
Only the job categories for which the CPU consumption is at least 1 hour are shown.
Hover the mouse over each vertical bar to get the CPU hours consumed by each category*

## CPU and memory utilization per job category

In [52]:
# Get the value of the maximum CPU time conversion factor
max_might = df['x_worker_might_sectohs06sec'].max()

# Get the model name of a CPU with the max conversion factor (i.e. the most powerful CPU model used for the set of jobs considered)
cpu_model = df.loc[df['x_worker_might_sectohs06sec'] == max_might, 'x_worker_model_type'].unique()[0]

# Only consider jobs of selected categories
categories = ('run_multiBandDriver', 'run_SingleFrameDriver', 'run_coaddDriver')
job_set = dc2_jobs[dc2_jobs['_job_category'].isin(categories)]

fig = bkh.figure(
    title = f'{dc2_label} — CPU & RAM UTILIZATION PER JOB CATEGORY [{dc2_period}]',
    x_axis_label = 'RAM (gigabyte)',
    y_axis_label = f'Hours on {cpu_model}',
)
style_figure(fig)

colors = ('dodgerblue', 'darkmagenta', 'mediumseagreen', 'orange', 'crimson', 'sienna')
for cat, color in zip(categories, colors):
    jobs_in_category = job_set.loc[job_set['_job_category'] == cat]
    data = bkhmodels.ColumnDataSource({
        'memory': jobs_in_category['x_mem_max_rss_gb'],
        'cpu':    jobs_in_category['x_cputime_hs06sec']/(max_might * 3600),
        'jobid':  jobs_in_category['job_number'],
        'queue':  jobs_in_category['qname'],
        'slots':  jobs_in_category['slots'],
    })
    fig.scatter(x='memory', y='cpu', source=data, fill_color=color, fill_alpha=0.5, line_color=color, legend=cat)

fig.yaxis.formatter = bkhmodels.formatters.NumeralTickFormatter(format="0,000")
fig.legend.location = "top_left"
fig.legend.click_policy = 'hide'
fig.add_tools(bkhmodels.HoverTool(
    tooltips = [
        ('JobID', '@jobid'),
        ('CPU', '@cpu'),
        ('RAM', '@memory'),
        ('Queue', '@queue'),
        ('Slots', '@slots'),
    ],
    mode = 'mouse',
))
bkh.show(fig)
print_md(f"""*The figure above shows the normalized CPU time used vs RAM used by DC2 jobs per job category.
Each dot represents one job. The CPU time is normalized to the concrete given CPU model and is aggregated over all the CPU slots requested by the job at submission time. Click on the legend to hide/show any category.
Hover the mouse over each dot to retrieve some details about that job.*""")

*The figure above shows the normalized CPU time used vs RAM used by DC2 jobs per job category.
Each dot represents one job. The CPU time is normalized to the concrete given CPU model and is aggregated over all the CPU slots requested by the job at submission time. Click on the legend to hide/show any category.
Hover the mouse over each dot to retrieve some details about that job.*