# EPMT Query API

This workbook will illustrate the usage of the EPMT Query API. 

`EPMT` stores job/process/thread data and metadata in a database, be that a local file
or a database server. Access to the database is provided by the EPMT Query API, which
provides functions specifically tailored to the needs of a performance analyst. 

Data from the query functions can be returned in a variety of formats. The most
useful and recommended option is the `pandas` format, which returns a Pandas
dataframe. Other formats include, `terse`, which provides a concise listing, and
`dict`, which returns collections a lists of python dictionaries. We also have a
powerful `orm` return format, that provides an ORM object, which provides for
lazy execution of SQL queries.

The result of this notebook will walk you through the API calls, starting with
the highest-level job queries, followed by more granular operation-level queries,
followed by low-level process and thread queries. As you analyze your data, you will
likely find this hierarchial approach useful in analyzing performance issues.

## Table of Contents

 * [Getting documentation](#getting-docs)
 * [Importing data for the study](#import-data)
 * [Import module](#import-module)
 * [API function categories](#api-categories)
 * [Job Queries](#job-query)
   * [Output format and converting between formats](#output-formats)
   * [Working with ORM objects](#orm-objects)
   * [Job tags](#job-tags)
   * [Job status](#job-status)
   * [Ordering and filtering jobs](#jobs-order-filter)
   * [Failed jobs](#failed-jobs)
   * [Process sums](#proc-sums-field)
   
 * [Operations](#ops)
   * [Selecting processes in an operation](#select-op-procs)
   * [Operation primitive](#operation-primitive)
   * [`get_ops` API call](#get-ops)
   * [Aggregating operation metrics](#op-metrics)
   * [Data-movement v. useful work](#dm-ops)
   * [`get_op_metrics` grouped by tag](#group-by-tag)
   * [CPU-time v. duration](#cpu-time-v-duration)
   * [Operation costs v. total time](#ops-costs)
   * [Operation root(s) (ADVANCED TOPIC)](#op-roots)

 * [Process Queries](#process-query)
   * [Process tags](#process-tags)
     * [Unique process tags in job (ADVANCED TOPIC)](#job-proc-tags)
   * [Filter and ordering](#filter-processes)
   * [Thread metrics aggregation (ADVANCED TOPIC)](#thread-metrics-aggregation)
   * [Process tree and depth (ADVANCED TOPIC)](#proc-tree)
     * [Process tree walk](#process-tree-walk)
   * [Process roots](#proc-roots)
   * [Process histogram](#procs-histogram)
   * [Process Timeline](#timeline)
 * [Thread Query](#thread-query) 
 * [Useful Attributes of Job/Process/Threads](#useful-attributes)
 * [Case Study](#case-study)
 * [Modifying Job Metadata](#modify-job-metadata)
   * [Annotate jobs](#annotate-jobs)
   * [Analyze jobs](#analyze-jobs) 
 * [Deleting Jobs](#delete-jobs)
   * [Delete jobs based on their start/finish times](#delete-jobs-time-based)


## <a name="getting-docs">Getting to the docs</a>

The module functions have embedded documentation in the form of docstrings. You can access it, 
as you would do for any Python module/function:

To get help for all functions in the module, do `help(<module-name)`:
```
help(eq)
```

To get documentation for a specific function, do something like:
```
help(eq.get_jobs)
```

On the command-line you can get a listing of API functions, by doing:
```
$ epmt help api
```

And, details on an individual function, by doing:
```
$ epmt help api <function-name>   # for example "epmt help api get_jobs"
```

<!-- ## <a name="import-data">Importing the data for this study</a>

This workbook relies on importing the following data. We use an sqlite database 
in this study, but you can use another database such as `postgresql`.
See the `preset_settings` folder to pick up a template of your choice and edit it
if needed. Save the template in the `epmt` folder as `settings.py`

While not required to do so, it's recommended that you start in a fresh database
so as not to affect your existing data. The sqlite database path is controlled
in `settings.py`, and is typically a file in the user `HOME`.

The tar files we need to import are in the `epmt` install area. 
```
# pick the database settings file of your choice
$ cp ../preset_settings/settings_sqlite_localfile.py settings.py

# backup your existing database
# The path might vary depending on your settings.py
# mv ~/EPMT_DB.sqlite ~/EPMT_DB.sqlite.backup

# now import the data
# First we figure out the EPMT install directory, and then import the .tgz from under it
$ EPMT_INSTALL_DIR="`epmt help| grep install_prefix| sed 's/papiex-//'| awk '{print $2}'`epmt"
$ epmt submit $EPMT_INSTALL_DIR/test/data/query_notebook/*.tgz

# check the list of imported jobs
$ epmt list
['625172', '627922', jobid_choice_1, '633144', '676007', '680181', '685000', '685016', '692544', '693147', '696127', '802954', '804285']
```

<a name="import-module"></a> -->

## <a name="api-categories">API Function Categories</a>

The API functions can be broadly put into a few categories based on the hierarchy of data they operate on: 

**Job-level queries**

These functions operate on individual jobs or a collections of jobs. A job can be thought of
as what we submit to a batch system to execute. It's a collection of processes. Job-level
queries include `get_jobs`, `get_job_tags`, `are_jobs_comparable`, and `comparable_job_partitions`.


**Operation-level queries**

These functions operate on the level of operations. An operation can be considered as a collection of processes that share some common tags. A tag is a collection of key/value pairs (a dictionary, if you will), and we associate these tags with processes at the time of execution using the environment variable `PAPIEX_TAGS`.

Consider, a process tagged with `'op:hsmget;op_instance:1;op_sequence:2'`
and another process tagged, with `'op:hsmget;op_instance:2;op_sequence:5'`. Then in terms of the 
most granular operation, the two processes belong to different operations. However, if we consider the
higher-level operation `'op:hsmget'`, then the two processes belong to the same high-level operation.
So an operation is intimately tied to the tag defining it.

Example of operation queries include, `get_ops`, and `get_op_metrics`.


**Process-level queries**

These functions operate on individual processes. A process constitutes of one or more threads. If a process
consists of a single thread, then the thread data is the process data. For multithreaded runs, the process
data is obtained by summation over the inidividual threads. Process-level tend to be more time-consuming, as the number of processes are many orders of magnitude more than the number of jobs. Example of process-level queries include `get_procs`, `job_proc_tags`, `conv_procs`, `rank_proc_tags_keys` and `timeline`.


**Thread-level queries**

At present we have only a single API call for this category: `get_thread_metrics`.


**Reference model calls**

A reference model is a collection of a limited number of jobs or operations. It is used to
serve as reference when determining whether other jobs or operations are inliers or outliers.
Example of such calls include `create_refmodel`, `delete_refmodels` and `get_refmodels`. We
won't be discussing reference model API calls in this notebook. They will be discussed in detail
in the Outliers notebook.


## <a name="job-query">Job Queries</a>

`get_jobs` is the most often used job-level query. It is used to query the database to find jobs
matching some criteria. It usually takes a `tag` and returns a collection of jobs in the format specified by `fmt`. The returned list can be pruned and/or ordered using `fltr`, `limit` and `order`.

You can also pass in one or more jobs as a `jobs` parameter, most often for format conversion.

Let's get started! Import the required modules:

In [1]:
# import the query api module
import epmt_query as eq

# pandas and numpy modules will be needed in the examples below
import pandas as pd
import numpy as np

Now lets grab some data to play with.

In [2]:
# let's get jobs, we use the job tag to select the jobs
# jobs = eq.get_jobs(tags='exp_name:ESM4_historical_D151;exp_component:ocean_month_rho2_1x1deg',fmt='terse')
jobs = eq.get_jobs(limit=1000,before=-30,after=-35,fmt='terse')
len(jobs)
#jobs = eq.get_jobs(limit=1000,fmt='terse',before=-20)
#jobs=eq.get_jobs(before=-10,after=-13,fmt='orm')
#jobs.count()

1000

<a name="output-formats"></a>`fmt` can take one of the following values:
 * `terse` -- this returns a list of job ids, no data
 * `pandas` -- this returns a pandas dataframe, with each row representing a job
 * `dict` -- for a list of python dictionaries, with each dict represeting a job
 * `orm` -- EPMT's database layer object for maximum flexibility and speediest queries
 
`terse` and `orm` queries are the fastest, and should be used if you just want to count
the number of items, for instance. `pandas` and `dict` take longer, and contain a lot
more data. If you expect to do computations or vector operatons, then you will prefer `pandas`.
If you want to iterate over a list, and deal only with ordinary python dicts, then choose
`dict`.

In [3]:
# above we got a list of job ids. sometimes we want to see more details
# than just the job id. We can use `conv_jobs` to convert between formats
# Or use `get_jobs` to get the specified format directly.
jobs_df = eq.conv_jobs(jobs, fmt='pandas')
display(jobs_df.columns.values)
print(len(jobs_df), 'jobs')
# show the first three rows
jobs_df[:3]

array(['updated_at', 'env_dict', 'tags', 'info_dict', 'env_changes_dict',
       'cpu_time', 'annotations', 'submit', 'analyses', 'jobid', 'start',
       'jobname', 'created_at', 'end', 'exitcode', 'duration', 'user',
       'rchar', 'syscr', 'syscw', 'wchar', 'cstime', 'cutime', 'majflt',
       'minflt', 'rssmax', 'cmajflt', 'cminflt', 'inblock', 'outblock',
       'usertime', 'num_procs', 'processor', 'starttime', 'vol_ctxsw',
       'guest_time', 'read_bytes', 'systemtime', 'time_oncpu',
       'timeslices', 'invol_ctxsw', 'num_threads', 'write_bytes',
       'time_waiting', 'all_proc_tags', 'delayacct_blkio_time',
       'cancelled_write_bytes'], dtype=object)

1000 jobs


Unnamed: 0,updated_at,env_dict,tags,info_dict,env_changes_dict,cpu_time,annotations,submit,analyses,jobid,...,systemtime,time_oncpu,timeslices,invol_ctxsw,num_threads,write_bytes,time_waiting,all_proc_tags,delayacct_blkio_time,cancelled_write_bytes
0,2023-05-06 01:06:03.986347,{'ENV': '/usr/local/Modules/5.1.1/init/profile...,{'exp_name': 'SPEAR_c96_o1_Hist_AllForc_IC1851...,"{'tz': 'EDT', 'status': {'exit_code': 0, 'exit...","{'PWD': '/xtmp/Fanrong.Zeng', 'OLDPWD': '/home...",64739707.0,{'EPMT_JOB_TAGS': 'exp_component:refineDiag;ex...,2023-05-04 14:04:00.799760,{'outlier_detection': [{'results': {'cpu_time'...,30459261,...,23239224,64801389023,239923,4816,0,112414720,6075555312,"[{'op': 'hsmget', 'op_instance': '1'}, {'op': ...",0,16384
1,2023-05-11 23:21:07.030482,{'ENV': '/usr/local/Modules/5.1.1/init/profile...,"{'exp_name': 'SM4p2_piControl_lamresoff_J', 'e...","{'tz': 'EDT', 'status': {'exit_code': 0, 'exit...","{'PWD': '/vftmp/Lori.Sentman', 'OLDPWD': '/hom...",383078321.0,{'EPMT_JOB_TAGS': 'exp_component:refineDiag;ex...,2023-05-04 14:08:00.584077,{'outlier_detection': [{'results': {'cpu_time'...,30459289,...,83746700,383165327651,452591,68473,0,33883619328,5684986583,"[{'op': 'hsmget', 'op_instance': '1'}, {'op': ...",0,2384990208
2,2023-05-06 01:06:02.627630,{'ENV': '/usr/local/Modules/5.1.1/init/profile...,{'exp_name': 'SPEAR_c96_o1_Hist_AllForc_IC1851...,"{'tz': 'EDT', 'status': {'exit_code': 0, 'exit...","{'PWD': '/xtmp/Fanrong.Zeng', 'OLDPWD': '/home...",65771016.0,{'EPMT_JOB_TAGS': 'exp_component:refineDiag;ex...,2023-05-04 14:10:55.343172,{'outlier_detection': [{'results': {'cpu_time'...,30459292,...,23072353,65835092952,239892,4746,0,112414720,5993332319,"[{'op': 'hsmget', 'op_instance': '1'}, {'op': ...",0,16384


In [4]:
# if you prefer dealing with python lists and dictionaries, set fmt='dict'. 
# We recommend using 'pandas' for it's scalability and rich API.
# Notice we can use `get_jobs` to convert formats as well as `conv_jobs`
# Below we get a list of dictionaries (showing the first entry only)
eq.get_jobs(jobs = jobs, fmt='dict')[:1]

[{'updated_at': datetime.datetime(2023, 5, 6, 1, 6, 3, 986347),
  'env_dict': {'ENV': '/usr/local/Modules/5.1.1/init/profile.sh',
   'PWD': '/home/Fanrong.Zeng',
   'HOME': '/home/Fanrong.Zeng',
   'HOST': 'pp031',
   'LANG': 'en_US',
   'PATH': '/home/fms/local/opt/../epmt/4.9.2-centos-7/epmt-install/epmt:/usr/local/Modules/5.1.1/bin:/home/gfdl/an+pp/bin:/app/slurm/default/bin:/bin:/usr/bin',
   'USER': 'Fanrong.Zeng',
   'GROUP': 'sd',
   'SHELL': '/bin/tcsh',
   'SHLVL': '2',
   'OSTYPE': 'linux',
   'TMPDIR': '/xtmp/Fanrong.Zeng/job30459261',
   'VENDOR': 'unknown',
   'ARCHIVE': '/archive/Fanrong.Zeng',
   'ATW_BIN': '/home/atw/bin',
   'ATW_LIB': '/home/atw/lib',
   'LC_TIME': 'C',
   'LOGNAME': 'Fanrong.Zeng',
   'MANPATH': '/usr/local/Modules/5.1.1/share/man:/home/gfdl/man:/usr/local/man:/usr/share/man',
   'ATW_HOME': '/home/atw',
   'ATW_UTIL': '/home/atw/util',
   'BASH_ENV': '/usr/local/Modules/5.1.1/init/bash',
   'HOSTNAME': 'pp031',
   'HOSTTYPE': 'x86_64-linux',
   'MAC

### <a name="orm-objects">ORM queries</a>
There is a very useful format called `orm`, which optimizes queries
and it lets you get the underlying Job (or Process) object directly.
ORMs do lazy-evaluation of queries, so many intermediate ORM operations will
return instantenously. Only when certain operations absolutely
need the data is the SQL actually fired.

We use [SQLAlchemy](https://www.sqlalchemy.org) for the ORM layer.

In [5]:
jobs_orm = eq.get_jobs(jobs, fmt='orm')
jobs_orm.count(), type(jobs_orm)

(1000, sqlalchemy.orm.query.Query)

`jobs_orm` above is a `Query` object. The `Query` object can be iterated
over (like a Python list). You can convert it to a list by using the slice
operator -- `[:]`.

In the above example, the query returns immediately, and the actual jobs
are never loaded from the database, since we never access them.

### <a name="job-tags">Job Tags</a>

Each job has a `tags` field that is set by annotating the job with `EPMT_JOB_TAGS`. 
The job tag is a stored as dictionary of key/value pairs. The most common use of the job tag is for selecting
jobs. You can specify the tag either as a dictionary or as a string, with each key/value
pair separated by semicolons. All the key/value pairs must match for a job to be considered
a match.

You might recall that we mentioned tags in the context of operations. Those tags were associated
with individual processes, and are distinct from these job tags. However both are essentially
a dictionary of key/value pairs.

In [6]:
targ_exp_name_tag='exp_name:CM4_historical_c96_OM4p25_i251_v7c192initCond'
targ_exp_name_jobs = eq.get_jobs(tags=targ_exp_name_tag, fmt='orm')

In [7]:
targ_exp_name_jobs.count()

150

In [8]:
for j in targ_exp_name_jobs:
    print(j.jobid, j.tags)

30330030 {'exp_name': 'CM4_historical_c96_OM4p25_i251_v7c192initCond', 'exp_time': '2004', 'exp_target': 'prod-openmp', 'script_name': 'atw_atmos_ts_monthly_sfc_ocean.csh', 'exp_platform': 'gfdl.ncrc5-intel22', 'exp_component': 'analysis'}
30332454 {'exp_name': 'CM4_historical_c96_OM4p25_i251_v7c192initCond', 'exp_time': '20070101', 'exp_target': 'prod-openmp', 'script_name': 'CM4_historical_c96_OM4p25_i251_v7c192initCond_refineDiag_20070101', 'exp_platform': 'gfdl.ncrc5-intel22', 'exp_component': 'refineDiag', 'exp_seg_months': '12'}
30333361 {'exp_name': 'CM4_historical_c96_OM4p25_i251_v7c192initCond', 'exp_time': '20080101', 'exp_target': 'prod-openmp', 'script_name': 'CM4_historical_c96_OM4p25_i251_v7c192initCond_refineDiag_20080101', 'exp_platform': 'gfdl.ncrc5-intel22', 'exp_component': 'refineDiag', 'exp_seg_months': '12'}
30334423 {'exp_name': 'CM4_historical_c96_OM4p25_i251_v7c192initCond', 'exp_time': '20090101', 'exp_target': 'prod-openmp', 'script_name': 'CM4_historical_c96

As job queries require tags for filtering, you might want to know the *range* of values keys within the job tags take in a collection of jobs. We provide an API call for this purpose:

In [9]:
eq.get_job_tags(targ_exp_name_jobs)

{'exp_name': 'CM4_historical_c96_OM4p25_i251_v7c192initCond',
 'exp_time': {'2004',
  '20070101',
  '20080101',
  '2009',
  '20090101',
  '20100101',
  '20110101',
  '20120101',
  '20130101',
  '2014',
  '20140101'},
 'exp_target': 'prod-openmp',
 'script_name': {'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_20090101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_20140101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_cmip_20090101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_cmip_20140101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_diurnal_20090101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_diurnal_20140101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_month_aer_20090101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_month_aer_20140101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_scalar_20090101',
  'CM4_historical_c96_OM4p25_i251_v7c192initCond_atmos_scalar_20140101',
  'CM4_historical_

The output says that among all the `CM4_historical_c96_OM4p25_i251_v7c192initCond`, fields like `exp_seg_months` (scroll to bottom), `exp_target`, and `exp_platform` were single-valued.`component` ranged over a set of values -- `{'atmos', 'atmos_cmip', 'atmos_diurnal', 'atmos_month_aer', 'atmos_scalar', 'ice', 'ice_1x1deg', 'ice_daily', 'ice_daily_extra', ...}`.

### <a name="job-status">Job Status</a>

The job status contains information about the exit code, signal received by the job.
It also contains some other useful information such as the job script path, etc.

The exit code, below, is taken as the return status of the first process after `epmt start`. 
Usually this would be the parent shell process. The exit code is 
**not the job status from from the batch system**.

In [10]:
eq.get_job_status(targ_exp_name_jobs[0].jobid)

{'exit_code': 0,
 'exit_reason': 'none',
 'script_name': 'atw_atmos_ts_monthly_sfc_ocean.csh'}

### <a name="jobs-order-filter">Ordering and Filtering Jobs</a>

You can use the `order`, `limit`, and `fltr` option with `get_jobs` to sort and filter the job list.
It is advisable to use `limit` when possible, as it sends a `LIMIT` option to the SQL query
and saves database load time. Remember, the database may have millions of jobs, so an unbounded `get_jobs` query with no filtering and no limits will exhaust the physical memory on the system. 

When you need to use unbounded queries, try to use the `orm` format, so you can run a `count` on the
returned ORM object prior to loading the data from the database.

In the example below, we use the `exitcode` field from the `Job` model. 

In [11]:
# some other useful queries might be for instance to order the jobs
# by duration, and getting the top 5
df = eq.get_jobs(jobs, order=eq.desc(eq.Job.duration), fmt="pandas")
df[['jobid', 'tags', 'duration', 'exitcode']]

Unnamed: 0,jobid,tags,duration,exitcode
0,30461350,{'exp_name': 'LM4p0_C384_OM4p25_pi_1990CO2_She...,3.481977e+10,0
1,30462895,{'exp_name': 'c96L33_am5a0_new_rad_diagonly_em...,2.936197e+10,0
2,30460368,{'exp_name': 'CM4_noBling_1990Control_c96_OM4p...,2.296954e+10,0
3,30459335,{'exp_name': 'c96L33_am4p0_QTsat20_calipso_lea...,1.662544e+10,0
4,30459711,{'exp_name': 'LM4p0_C96_pi_1850CO2_Sheffield_s...,1.435104e+10,0
...,...,...,...,...
995,30462164,{},3.654318e+06,0
996,30462040,{},3.240684e+06,0
997,30462165,{},3.232943e+06,0
998,30462166,{},3.228576e+06,0


<a name="failed-jobs"></a>Let's figure out which if any jobs failed.

In [12]:
eq.get_jobs(jobs_orm, fltr=(eq.Job.exitcode != 0), fmt='terse')

[]

So, none of the jobs had a non-zero exit code.

### <a name="proc-sums-field">Aggregation across job processes</a>

Each job object has a `proc_sums` field that aggregates data across the 
processes of the job. The field itself is a dictionary of key/value pairs.
This field is an attribute in the Job object, and when converting from `orm` 
to the other formats, the underlying key/value pairs of the dictionary are made available 
as top-level fields of the `dict` or `pandas` dataframe. `proc_sums` represents aggregates across
the processes of a job:

In [13]:
j = jobs_orm.first()
sorted(j.proc_sums.keys())

['all_proc_tags',
 'cancelled_write_bytes',
 'cmajflt',
 'cminflt',
 'cstime',
 'cutime',
 'delayacct_blkio_time',
 'guest_time',
 'inblock',
 'invol_ctxsw',
 'majflt',
 'minflt',
 'num_procs',
 'num_threads',
 'outblock',
 'processor',
 'rchar',
 'read_bytes',
 'rssmax',
 'starttime',
 'syscr',
 'syscw',
 'systemtime',
 'time_oncpu',
 'time_waiting',
 'timeslices',
 'usertime',
 'vol_ctxsw',
 'wchar',
 'write_bytes']

Now, the fields shown above become available in other formats (`dict` and `pandas`) as top-level fields, while the `proc_sums`
field itself is masked.

In [14]:
j_df = eq.get_jobs(j, fmt='pandas')
sorted(j_df.columns.values)

['all_proc_tags',
 'analyses',
 'annotations',
 'cancelled_write_bytes',
 'cmajflt',
 'cminflt',
 'cpu_time',
 'created_at',
 'cstime',
 'cutime',
 'delayacct_blkio_time',
 'duration',
 'end',
 'env_changes_dict',
 'env_dict',
 'exitcode',
 'guest_time',
 'inblock',
 'info_dict',
 'invol_ctxsw',
 'jobid',
 'jobname',
 'majflt',
 'minflt',
 'num_procs',
 'num_threads',
 'outblock',
 'processor',
 'rchar',
 'read_bytes',
 'rssmax',
 'start',
 'starttime',
 'submit',
 'syscr',
 'syscw',
 'systemtime',
 'tags',
 'time_oncpu',
 'time_waiting',
 'timeslices',
 'updated_at',
 'user',
 'usertime',
 'vol_ctxsw',
 'wchar',
 'write_bytes']

## <a name="ops">Operations</a>

As a job may span over 50K processes, it's often helpful to select processes belonging to a particular operation. The selection is done using tags, sets of key/value pairs.
In the general case, the collection of processes in an operation forms a **forest**. In the degenerate case of a single root process, the forest will contain a single tree.

### <a name="select-op-procs">Selecting processes in an operation</a>

We can select the processes in an operation by passing a tag to `get_procs`.
You may limit the selection to a single job or multiple jobs using the
`jobs` parameter to `get_procs`.

In [15]:
# below we use the ORM format as we just want a count on the number of processes in the operation
hsmget_op_procs = eq.get_procs(jobs, tags='op:hsmget', fmt='orm')
hsmget_op_procs.count()

3010084

### <a name="operation-primitive">The Operation primitive</a>

Using `get_procs` with a tag to select processes in a operation is somewhat
clumsy. The EPMT Query API defines an **Operation** primitive. The `Operation`
API call is passed one or more jobs, and a `tag`. Internally, it calls `get_procs`.
By using the `Operation` primitive, you get aggregated metrics across the
processes constituting the operation in a `proc_sums` attribute. You can specify a granular
operation such as `{'op': 'timavg', 'op_instance': 100, 'op_sequence': 5 }`, or a more
coarse operation, such as `{'op': 'timavg'}`. The important thing to understand is that
all the processes that constitute the operation will share *ALL* the keys/values of the tag.

In [16]:
# you could also select a more granular operation with a more
# specific, tag, such as {'op': 'hsmget', 'op_instance': 10, 'op_sequence': 25}
# to keep this quick, we'll limit it to 100 jobs worth of hsmget calls from the job's we grabbed earlier
jobs = eq.get_jobs(jobs=jobs, limit=100,fmt='terse')
op = eq.Operation(jobs, {'op': 'hsmget'})
(op.tags, op.processes.count(), op.proc_sums)

({'op': 'hsmget'},
 338561,
 {'rssmax': 1981459228,
  'write_bytes': 570574307328,
  'starttime': 704029036421120000,
  'majflt': 7085,
  'delayacct_blkio_time': 0,
  'minflt': 368312409,
  'cminflt': 1092327655,
  'vol_ctxsw': 11373385,
  'timeslices': 12639897,
  'time_waiting': 760840885371,
  'usertime': 5024904869,
  'syscw': 4970189,
  'num_procs': 338561,
  'cutime': 22393230000,
  'inblock': 600305,
  'wchar': 570573197072,
  'time_oncpu': 6742622251412,
  'cancelled_write_bytes': 6078464,
  'processor': 0,
  'cmajflt': 11927,
  'cpu_time': 6737386439.0,
  'syscr': 13439102,
  'read_bytes': 307417600,
  'num_threads': 0,
  'invol_ctxsw': 872610,
  'numtids': 392879,
  'systemtime': 1712481570,
  'cstime': 6413810000,
  'duration': 79087149976.0,
  'outblock': 1114402944,
  'guest_time': 0,
  'rchar': 81644231642})

### <a name='get-ops'>`get_ops` API call</a>
We also have an API call `get_ops` (much like `get_jobs` and `get_procs`) that supports querying for a collection of operations, and multiple output formats, selected using the option -- `fmt`.

In [17]:
dm_ops = eq.get_ops(jobs, tags = ['op:hsmget', 'op:dmput', 'op:cp', 'op:rm', 'op:mv', 'op:untar'], fmt='pandas')
pd.set_option('display.max_colwidth', 150)
dm_ops[['tags','proc_sums', 'start', 'finish', 'duration']]

Unnamed: 0,tags,proc_sums,start,finish,duration
0,{'op': 'hsmget'},"{'rssmax': 1981459228, 'write_bytes': 570574307328, 'starttime': 704029036421120000, 'majflt': 7085, 'delayacct_blkio_time': 0, 'minflt': 36831240...",2023-05-04 18:04:09.447189,2023-05-04 19:36:16.113445,79087150000.0
1,{'op': 'dmput'},"{'rssmax': 11875508, 'write_bytes': 8380416, 'starttime': 3580894371840000, 'majflt': 282, 'delayacct_blkio_time': 0, 'minflt': 2710251, 'cminflt'...",2023-05-04 18:24:53.372775,2023-05-04 23:00:13.658708,2029277000.0
2,{'op': 'cp'},"{'rssmax': 856933868, 'write_bytes': 47677440, 'starttime': 31201979269120000, 'majflt': 1217, 'delayacct_blkio_time': 0, 'minflt': 77669011, 'cmi...",2023-05-04 18:25:24.831008,2023-05-04 22:18:51.204836,11837100000.0
3,{'op': 'rm'},"{'rssmax': 41718968, 'write_bytes': 60526592, 'starttime': 18341500846080000, 'majflt': 31, 'delayacct_blkio_time': 0, 'minflt': 15404733, 'cminfl...",2023-05-04 18:24:02.796579,2023-05-04 23:00:13.069348,393796300.0
4,{'op': 'mv'},"{'rssmax': 3509233024, 'write_bytes': 227479552, 'starttime': 137561903226880000, 'majflt': 6106, 'delayacct_blkio_time': 0, 'minflt': 325742351, ...",2023-05-04 18:24:06.843994,2023-05-04 23:00:13.049869,47964580000.0
5,{'op': 'untar'},"{'rssmax': 581572, 'write_bytes': 37847965696, 'starttime': 220967280640000, 'majflt': 33, 'delayacct_blkio_time': 0, 'minflt': 196177, 'cminflt':...",2023-05-04 18:25:23.216403,2023-05-04 22:18:46.916176,171713500.0


The dataframe contains one row per `tag` (or operation). `proc_sums` contains aggregate metrics across the underlying processes constituting the operation.

If you think of an operation as a collection of processes sharing a tag, you will realize that an operation
need not have a single `start` and `finish`. Instead, an operation may have many `start` and `finish`, each constituting an `interval`. By default, `get_ops`, to reduce computational complexity, does not determine
all the intervals of an operation. However, if you pass an option, `full = True`, `get_ops` will return
additional fields such as `intervals`, that will be a list of tuples, with each tuple representing a single
interval in the form (`start_time`, `finish_time`). The set of intervals of operation determines all the
periods of execution of the operation. In any operation, a boolean field -- `contiguous` -- indicates whether the
job executed over multiple intervals or a single interval.

In [18]:
ops = eq.get_ops(jobs, tags = ['op:hsmget'], full=True, fmt='pandas')
ops[['jobs', 'tags', 'intervals', 'start', 'finish', 'contiguous', 'duration', 'proc_sums', 'num_runs', 'processes']]

Unnamed: 0,jobs,tags,intervals,start,finish,contiguous,duration,proc_sums,num_runs,processes
0,"[30459261, 30459289, 30459292, 30459326, 30459335, 30459336, 30459337, 30459338, 30459339, 30459340, 30459341, 30459342, 30459343, 30459344, 30459...",{'op': 'hsmget'},"((2023-05-04 18:04:09.447189, 2023-05-04 18:04:53.561729), (2023-05-04 18:08:15.297657, 2023-05-04 18:20:16.259824), (2023-05-04 18:20:31.605689, ...",2023-05-04 18:04:09.447189,2023-05-04 19:36:16.113445,False,79087150000.0,"{'rssmax': 1981459228, 'write_bytes': 570574307328, 'starttime': 704029036421120000, 'majflt': 7085, 'delayacct_blkio_time': 0, 'minflt': 36831240...",17,"[{'info_dict': None, 'tags': {'op': 'hsmget', 'op_instance': '1'}, 'path': '/home/fms/local/perlbrew/perls/perl-5.36.0/bin/perl', 'exitcode': 0, '..."


As you can see the `hsmget` operation contains `1337` distinct intervals of execution, with no overlapping. This means between `hsmget` operation invocations, other processes (with different tags) executed.

### <a name="op-metrics">Aggregating operation metrics</a>

The `Operation` primitive provides an easy way to obtain aggregates on metrics across
processes in an operation. Before `Operation`, the way to obtain metrics was to
use the `get_op_metrics` API call:

In [19]:
# widen width of column display width to show full tag
pd.set_option('display.max_colwidth', 200)

# get the operations with the top cpu_time summed across all processes. 
# Note, cpu_time is better measure of time spent in an operation than 
# 'duration', which might end up double-counting as in a 
# parent-child process scenario, where the parent waits on the time child.
jobid_choice_1='30329216'
jobid_choice_2='30330030'
jobid_choice_3='30330333'
ops_df = eq.get_op_metrics([jobid_choice_1, jobid_choice_2], fmt='pandas').sort_values(by='cpu_time', ascending=False)
print(ops_df.count())
ops_df[['jobid', 'tags', 'duration', 'cpu_time']][:10]


rssmax                   94
write_bytes              94
starttime                94
majflt                   94
delayacct_blkio_time     94
minflt                   94
cminflt                  94
vol_ctxsw                94
timeslices               94
time_waiting             94
usertime                 94
syscw                    94
cutime                   94
inblock                  94
wchar                    94
time_oncpu               94
cancelled_write_bytes    94
processor                94
cmajflt                  94
syscr                    94
read_bytes               94
num_threads              94
invol_ctxsw              94
systemtime               94
cstime                   94
outblock                 94
guest_time               94
rchar                    94
job                      94
jobid                    94
tags                     94
num_procs                94
numtids                  94
cpu_time                 94
duration                 94
dtype: int64


Unnamed: 0,jobid,tags,duration,cpu_time
82,30329216,"{'op': 'timavg', 'op_instance': '1'}",12845150000.0,3629804000.0
5,30329216,"{'op': 'hsmget', 'op_instance': '1'}",19192000000.0,2382821000.0
17,30329216,"{'op': 'mv', 'op_instance': '21'}",12685090000.0,1958200000.0
52,30329216,"{'op': 'ncrcat', 'op_instance': '15'}",2454646000.0,1670048000.0
81,30329216,"{'op': 'splitvars', 'op_instance': '2'}",4327979000.0,1507980000.0
54,30329216,"{'op': 'ncrcat', 'op_instance': '2'}",1893357000.0,1410476000.0
0,30329216,"{'op': 'cp', 'op_instance': '3'}",6228548000.0,735006100.0
93,30329216,"{'op': 'timavg', 'op_instance': '9'}",1728144000.0,643279200.0
90,30329216,"{'op': 'timavg', 'op_instance': '3'}",1463522000.0,632811700.0
92,30329216,"{'op': 'timavg', 'op_instance': '7'}",1425365000.0,626141800.0


#### <a name="dm-ops">Data movement operations</a>
The above call was slow to execute and resulted in a lot of operations. The `get_op_metrics` call can take a 
list of tags if one knows the operations one cares about. The pruning using the `tags` argument speeds up
the operation significantly. Let's figure out the time spent
in data movement operations</a> v. useful work.
In the call to `get_op_metrics` below, we pass in the *list of tags* that
represent the data-movement operations. As it's a list of tags, it's like
an OR-operation with the tags.

In [20]:
dm_tags = ['op:hsmget', 'op:cp', 'op:dmget', 'op:gcp', 'op:mv', 'op:untar', 'op:tar', 'op:rm']
dm_ops_df = eq.get_op_metrics(jobs, tags = dm_tags)
# only showing the first 15 rows
dm_ops_df[['jobid', 'tags', 'cpu_time', 'duration', 'num_procs']][:15]

Unnamed: 0,jobid,tags,cpu_time,duration,num_procs
0,30459261,{'op': 'hsmget'},25252328.0,151837100.0,2584
1,30459289,{'op': 'hsmget'},245633566.0,4167971000.0,6474
2,30459292,{'op': 'hsmget'},25479575.0,154809500.0,2584
3,30459326,{'op': 'hsmget'},29203049.0,219338200.0,2909
4,30459335,{'op': 'hsmget'},41197640.0,361380100.0,3680
5,30459336,{'op': 'hsmget'},122173336.0,1137542000.0,9250
6,30459337,{'op': 'hsmget'},10365006.0,78066510.0,920
7,30459338,{'op': 'hsmget'},10507524.0,87323250.0,920
8,30459339,{'op': 'hsmget'},10336843.0,102583100.0,920
9,30459340,{'op': 'hsmget'},4131200.0,27326700.0,295


While the query above helps, we would like it to aggregate across jobs by tag. This
is easily accomplished by passing the <a name="group-by-tag">`group_by_tag`</a> 
argument to `get_op_metrics`:

In [21]:
dm_ops_df_grouped = eq.get_op_metrics(jobs, tags = dm_tags, group_by_tag = True)
dm_ops_df_grouped[['tags', 'cpu_time', 'duration', 'num_procs']]

Unnamed: 0,tags,cpu_time,duration,num_procs
0,{'op': 'cp'},1862087000.0,11837100000.0,36032
1,{'op': 'hsmget'},6737386000.0,79087150000.0,338561
2,{'op': 'mv'},6899574000.0,47964580000.0,154407
3,{'op': 'rm'},278172300.0,393796300.0,14442
4,{'op': 'untar'},54784830.0,171713500.0,160


So, the total cpu time spent in all data-movement operations can be calculated easily.

In [22]:
dm_ops_df_grouped['cpu_time'].sum()/1e6

15832.005057

In [23]:
# total cpu time spent in the jobs
# here we want to get the total cpu time of the job without any operation filtering
s = 0
for j in jobs_orm: s += j.cpu_time
s/1e6

601031.409144

In [24]:
round((100*__/_), 2)

2.63

#### <a name="cpu-time-v-duration">cpu time v. duration</a>
So, the data-movement operations take about `26%` of the total cpu time across our jobs.
There is a reason we did not use `duration` for our calculation, and instead we used
`cpu_time` (a.k.a exclusive cpu time). The reason is that while we have an accurate 
value for `duration` for a job (wall-clock time of the job script), when we attempt to
determine the `duration` of an operation, we run the risk of double-counting `duration`.

How can that happen? 

If a process forks and waits for a child to terminate. The `duration` or `wall-clock` 
time will end up getting calculated both for the parent process and the child process. 
Since the `get_op_metrics` call selects processes using tags, it will select the parent and
child processes for the operation and sum their durations, not realizing that in some
cases the parent may have been background waiting for the child to complete (for example
in case of a shell waiting for a spawned process to complete before resuming execution).
`cpu_time` on the other hand is the actual time spent by a process on the cpu, and 
cannot be counted twice in such a scenario.

As an exercise, let's do the above computation with `duration`.

In [25]:
dm_ops_df_grouped['duration'].sum()/1e6

139454.340006

In [26]:
jobs_duration = 0
for j in jobs_orm: jobs_duration += j.duration
print(f'jobs_duration={jobs_duration/1e6}')
jobs_duration/1e6 > dm_ops_df_grouped['cpu_time'].sum()/1e6

jobs_duration=1183071.344615


True

If the above evaluates to `True`, the duration of data-movement operations is less than that of the job duration itself, which is a desirable outcome for a scientific workflow.

If the above evaluates to `False`, the cumulative duration of the data-movement operations is showing up to be higher than the job duration. That's ridiculous! It might happen because we double-counted the `duration` of background-waiting processes. 

Regardless of outcome, avoiding this double-counting is easy with the API's `get_op_metrics` support function, `op_duration_method`. The trade-off is a slower function call, and so this option is disabled by defualt. To prevent double-counting of duration, we pass the `op_duration_method` argument to `get_op_metrics`.

In [27]:
dm_ops_df_grouped = eq.get_op_metrics(jobs, tags = dm_tags, group_by_tag = True, op_duration_method = 'sum-minus-overlap')
dm_ops_df_grouped[['tags', 'cpu_time', 'duration', 'num_procs']]

Unnamed: 0,tags,cpu_time,duration,num_procs
0,{'op': 'cp'},1862087000.0,5747539000.0,36032
1,{'op': 'hsmget'},6737386000.0,13453400000.0,338561
2,{'op': 'mv'},6899574000.0,22934790000.0,154407
3,{'op': 'rm'},278172300.0,393796300.0,14442
4,{'op': 'untar'},54784830.0,171713500.0,160


In [28]:
print(f'dm_ops_df_grouped sum = {dm_ops_df_grouped["duration"].sum()/1e6}, jobs_duration = {jobs_duration/1e6}')
jobs_duration/1e6 > dm_ops_df_grouped['duration'].sum()/1e6

dm_ops_df_grouped sum = 42701.236509, jobs_duration = 1183071.344615


True

If your outcome before was `False`, hopefully you now see that `duration` is less than the old, and your final outcome is `True`- tell someone on the EPMT team if it's not!

If your outcome before was `True`, you may see the exact same numbers and outcome. Which is not a bad thing!

### <a name="ops-costs">Operation metrics as a fraction of cumulative job metrics</a>

You might be wondering if there's a quicker way to determine the above data movement computation, and there is! 

We have an API call `ops_costs` to determine the value of a metric for operation as fraction of the value of the metric for the entire job. The metric can be `cpu_time` or `duration`. The call automatically ensures that `duration` of overlapping processes is not double-counted.

In [29]:
# features be `duration` and/or `cpu_time`. Here we use cpu_time
(dm_percent, dm_ops_df, jobs_cpu_time, dm_agg_df_by_job) = eq.ops_costs(jobs, tags = dm_tags, metric = 'cpu_time')
dm_percent

KeyError: 'cpu_time'

In [None]:
# only showing the first 15 rows
dm_ops_df[['jobid', 'tags', 'num_procs', 'cpu_time']][:15]

In [None]:
# this shows the costs aggregated by jobs (across tags)
dm_agg_df_by_job

The table above shows the percentage of `cpu_time` spent in data-movement operations by job. While, most of the jobs are in the `30-40%` range, the first job is an outlier. This might be because, being the first in the series, it might need it's data to be fetched from long-term storage. While, the jobs following it, would find their data in the cache.

### <a name="op-roots">Operation Root(s) (ADVANCED TOPIC)</a>

Sometimes we are interested in knowing the root process(es) that spawned the processes
constituting an operation, or more simply "which processes started an operation". `op_roots` takes
a job (or collection of jobs) and an operation tag is parameters, and returns a collection
of processes that are the root process(es) for the operation.

In [None]:
# we could also give a list of jobs 

op_roots = eq.op_roots([jobid_choice_2], 'op:hsmget', fmt='pandas')
op_roots.sort_values('duration', ascending=False)[['tags', 'exename', 'args', 'pid', 'ppid', 'duration']]


As you can see that a number of root processes for the `hsmget` operation were identified. 
Here, a root process of an operation is defined as process, whose parent does not share
the same operation `tag`. So in other words the processes list above had parents which
were not tagged with the `hsmget` tag. 

## <a name="process-query">Process Queries</a>

A process query returns a collection of one or more processes. The query is
passed a `jobs` parameter to restrict the process set to those that belong to a
specified set of `jobs`. 

Like the job query, the process query can take `tag`, `fmt`, 
`fltr`, `order` and `limit` to filter and format the output. `order` and `limit` become
particularly important in process queries as each job can have thousands of processes,
and that takes time to load from the database. In the same vein, using `fmt=orm` is a good
idea, in process queries as that minimizes the database overhead in certain cases.

By default, with `order` unset the ordering of processes in the returned collection
is ORM and database dependent.

In [None]:
# If you want to get the processes belonging to a job
# here each row in the pandas dataframe contains one job process
# again, you can use the 'terse' fmt option to get just the list of database ids of the processes
procs = eq.get_procs([jobid_choice_2], fmt='pandas')
display(sorted(procs.columns.values))
# remove columns with only None values and see the first 10 rows
procs.replace(to_replace=[None], value=np.nan).dropna(axis=1,how="all")[:10]

You could also pass in more than one job, in which case the returned processes
would be a superset of those across the jobs list. Here we use the `orm` format
to speed the query since we just want a `count` of processes.

In [None]:
procs = eq.get_procs([jobid_choice_1, jobid_choice_2], fmt='orm')
procs.count()
#procs = eq.get_procs([jobid_choice_1, jobid_choice_2], fmt='pandas')
#procs

### <a name="process-tags">Process Tags</a>

Each process saves a dictionary of key/value pairs, such as:
`{'op': "ncatted", 'op_instance': 12, 'op_sequence': 159}`

As mentioned before, the process tag is commonly used to filter processes that constitute an **operation** using the `tag` option.

In [None]:
# below we get the processes in an operation that is identified by 'op:cp;op_instance:36'
op = eq.get_procs(jobs, tags='op:cp;op_instance:36', fmt='pandas')

# how many procs did we grab? 
print(len(op))

# print out the process at the top of the pandas df
op.iloc[0]

#### <a name="job-proc-tags">Unique process tags in a job (ADVANCED TOPIC)</a>

We can determine the unique set of process tags across all the processes in a job (or collection of jobs) using the
`job_proc_tags` API call.

In [None]:
# suppose you want to figure out the unique set of operations
# across all the jobs of interest. We would pass in our list of jobs
# showing the first 15 entries
eq.job_proc_tags(jobs_orm)[:15]

### <a name="filter-processes">Filtering and Ordering Processes</a>

`order`, `limit` and `fltr` should be used where possible to reduce query time.

In [None]:
# now let's say we care about a particular operation. 
# Let's find the processes in the operation, and
# sort them by the cpu_time, and then see the top offenders
ncatted_procs = eq.get_procs(jobs_orm, 
                             tags = {'op': 'ncatted'}, 
                             order=eq.desc(eq.Process.cpu_time), 
                             limit=10,
                             fmt='pandas')
ncatted_procs[['jobid', 'tags', 'exename', 'duration', 'cpu_time']]

We could have used a more precise tag, such as `{'op': 'ncatted', 'op_instance': 79}`,
for more granular operation selection. And, maybe an exename, such as `ncatted`.
Let's sort the returned list by decreasing process duration.

In [None]:
procs = eq.get_procs(jobs_orm, tags='op:ncatted;op_instance:79', 
                     fltr=(eq.Process.exename == "ncatted"), 
                     order=(eq.desc(eq.Process.duration)), 
                     fmt='pandas')
procs[['jobid', 'tags', 'exename', 'duration', 'cpu_time', 'exitcode']]

### <a name="thread-metrics-aggregation">Process contains aggregated thread metrics (ADVANCED TOPIC)</a>

The `pandas` (and the `dict`) formats, in addition to having process-level data in each row, also have fields that represent metrics aggregated across the underlying threads of the process, such, as
`rssmax`, `cpu_time`, and `rchar`. The ORM `Process` object instead has a `threads_sums` attribute, 
which is a dictionary containing the above fields.

In [None]:
procs.columns.values

### <a name="proc-tree">Process tree and depth (ADVANCED TOPIC)</a>

Every process in a job has a `depth` parameter that denotes its depth
in the process tree, with the root process having a `depth` of `0`.

As the process tree construction is an expensive process, we have disabled
automatic creation of the process tree during ingestion by setting `lazy_compute_process_tree`
to `True` in `settings.py`. This does mean that the `depth` parameter is
ordinarily left unset in the process ORM, dataframe or dictionaries. 
We automatically compute the process tree if it's needed for example
to determine the root process(es), or operation roots, etc.

If for any reason you would like to have the `depth` parameter populated,
you can call the `compute_process_trees` API call:

In [None]:
eq.compute_process_trees([jobid_choice_1, jobid_choice_2jobid_choice_1)
procs = eq.get_procs(['30327560', '30327581'], fmt='pandas', order=eq.desc(eq.Process.depth))
procs[['exename', 'pid', 'depth']][:5]

Above we compute the process trees for two jobs, and then select their processes ordered by
decreasing depth. As you can see the process trees have a maximum depth of `7`.

Let's <a name="process-tree-walk"></a>navigate the process tree!

In [None]:
# now let's walk through the process tree. To make this easy, we use the 'orm' format
# first we compute the process tree as we intend to walk down the tree
eq.compute_process_trees([jobid_choice_1])
# let's sort the processes by exclusive cpu time
# You will get a sorted list of ORM objects, let's see the top 10
procs = eq.get_procs('30327560', order = (eq.desc(eq.Process.cpu_time)), fmt='orm')[:10]
[p.pid for p in procs]

In [None]:
# lets pick up the first
p = procs[0]

In [None]:
p.exename

In [None]:
p.exename, p.args, p.duration, len(p.children), p.numtids

In [None]:
parent = p.parent
parent

In [None]:
parent.exename, parent.args, parent.pid, len(parent.children), len(parent.descendants)

In [None]:
# let's see the aggregate thread metrics for this process
p.threads_sums

### <a name="proc-roots">Process Roots</a>

Often we are interested in finding the root process(es) in a **job**. We have two API calls to help facilitate that: `get_roots` and `root`. 

`get_roots` and `root` are different from `op_roots`, which we discussed earlier as the former find the root process(es) in a **job** or **collection of jobs**, while latter finds the root processes in an **operation**.

In [None]:
root_procs = eq.get_roots([jobid_choice_!], fmt='orm')
root_procs.count()

As you can see we don't have a process tree rooted in a single process but, rather, a
*forest* with multiple root processes. Let's see some details about the root processes: 

In [None]:
# the processes are ordered (by default) in order of start time
for r in root_procs: print(r.exename, r.args, r.pid, r.ppid, r.start)

In [None]:
# eq.root picks up the very first process by start time
first_root = eq.root([jobid_choice_1], fmt='orm')
first_root.exename, first_root.args, first_root.pid, first_root.ppid, first_root.start

Looking at the `ppid` field we can see that the root processes returned by `get_roots` are actually siblings
spawned by a single process. The `pid` of the parent is shown as `1`. You can [read here more about why the parent process becomes init for orphan processes](https://unix.stackexchange.com/questions/476191/processes-ppid-changed-to-1-after-closing-parent-shell). `eq.root` on the other just returns the first process from the sibling set. 

We recommend you use `get_roots` for completeness. However, in certain situations `root` suffices.

### <a name="procs-histogram">Process Histogram</a>

Often we want determine a histogram for an attribute in the process model. For instance, in a job or job collection, how often did we use a particular `executable`. Or, how many process were multthreaded?

In [None]:
# we can give a single job or a list of jobs
eq.procs_histogram([jobid_choice_1], attr='exename')

In [None]:
eq.procs_histogram([jobid_choice_1], attr='numtids')

So the job contained `3346` single-thread processes. `7` processes had two threads each, and `59` processes had `4` threads each. 

You may want to refer to the [model attributes](#useful-attributes) for a listing of the attributes in the
`Process` model.

### <a name="timeline">Timeline</a>
Sometimes you want to get a timeline of the processes in the order they were spawned

In [None]:
procs = eq.timeline(jobs, fmt='pandas', limit=25)
procs[['exename', 'tags', 'start', 'duration']]

## <a name="thread-query">Thread Query</a>

The thread query requires passing one or more *process primary keys* or `Process`
objects to `get_thread_metrics`. Let's illustrate this with an example, where
we first obtain the <a name="root-process">root process</a> of a job:

In [None]:
# let's find the root process for a particular job
root = eq.root(jobid_choice_1, fmt='orm')
root.pid

In [None]:
root_threads_df = eq.get_thread_metrics(root)
display(root_threads_df.columns.values)
root_threads_df[['process_pk', 'tid', 'usertime', 'systemtime', 'rssmax']]

## <a name="useful-attributes">Useful attributes in Job, Process and Thread objects</a>

The following are some useful attributes of the job, process and thread objects.
They are accessible when using the `orm` format. They are available in the 
`pandas` and `dict` formats. There is one important thing to note:

`proc_sums` field of the Job object is masked for `pandas` and `dict` formats
and the underlying keys of the dictionary are exposed at the top-level.

`threads_sums` field of the Process object is masked for `pandas` and `dict` format
and the underlying keys of the dictionary are exposed at the top-level.

### Attributes of the Job ORM Model
 - `duration`: this is the wallclock time in microseconds
 - `cpu_time`: user+system time aggregated across all processes of the job
 - `start`:    start time in microseconds since epoch
 - `end`:      end time in microseconds since epoch
 - `jobid`:    database id for job (unique)
 - `exitcode`: return code from job
 - `tags`:     dict of key/value pairs
 - `processes`:list of processes belonging to job
 - `proc_sums`: aggregates across processes of a job
 

### Attributes of the Process ORM Model
 - `duration`: this is the wallclock time in microseconds
 - `cpu_time`: exclusive user+system time for process (aggregated across it's threads)
 - `inclusive_cpu_time`: user+system time for the process and *all its descendants*
 - `start`:    start time in microseconds since epoch
 - `end`:      end time in microseconds since epoch
 - `tags`:     dict of key/value pairs
 - `threads_df`: json serialized dataframe of process threads (ADVANCED)
 - `threads_sums`: key/value pairs consisting of sums of thread metrics (ADVANCED)
 - `numtids`:  number of threads
 - `exename`
 - `args`
 - `pid`
 - `ppid`
 - `id`:       database ID for process
 - `exitcode`
 - `parent`
 - `children`
 - `ancestors`
 - `descendants`
 
 
### Attributes of the Thread ORM Model
 - `usertime`
 - `systemtime`
 - `user+system`
 - `rssmax`
 - `majflt`
 - `read_bytes`
 - `write_bytes`

## <a name="case-study">Case Study using the API</a>

Let's walk through a contrived example that shows how you may use the 
API at different levels (job, operation and processes).

In [None]:
# let's pick a job 30327560, and see what the top-level job query yields us.
# We will choose the pandas dataframe 
eq.get_jobs(jobid_choice_1, fmt='pandas')

In [None]:
# now get the processes that are part of this job, let's sort them by the inclusive time
# we need to pass in the job id to restrict the query to a particular job
# the inclusive_cpu_time sums all the cpu times of the process and its dependents
# in this case you can see that after the top-level shell process shows up at the top
procs = eq.get_procs(jobid_choice_1, order = (eq.desc(eq.Process.inclusive_cpu_time)), fmt='pandas', limit=10)
procs[['exename', 'duration', 'inclusive_cpu_time', 'cpu_time']]

In [None]:
# you may also want to see the processes that were the big CPU hogs in the job
# so here we order by decreasing cpu time, see the top-10 using limit
cpu_hogs = eq.get_procs(jobid_choice_1, order = (eq.desc(eq.Process.cpu_time)), fmt='pandas', limit=10)
cpu_hogs[['exename', 'args', 'tags', 'depth', 'cpu_time', 'duration']]

In [None]:
# the fregrid process took pretty long. Let's see the fregrid operation in more detail
# lets see which processes took the top *inclusive* cpu time.
# Let's limit the output to the top 10 results
# and let's get a pandas dataframe
fregrid_op = eq.get_procs([jobid_choice_1], tags = 'op:fregrid', order=eq.desc(eq.Process.inclusive_cpu_time), limit=10, fmt='pandas')
fregrid_op

So the `fregrid` had multiple invocations of the `fregrid` executable, and that and
`mv` took the longest time to complete.

<a name="failed-procs"></a>Let's see if there are any failed processes in our job selection.

In [None]:
# Let's find the failed processes across our jobs subset
failed_procs = eq.get_procs([jobid_choice_1], fltr=(eq.Process.exitcode > 0), fmt='orm')
failed_procs.count()

That's a large number of failed processes!

In [None]:
# The orm also gives an easy way to navigate the process hierarchy
# Let's use the ORM directly to walk through the job
j = eq.get_jobs(jobid_choice_1, fmt='orm').first()
j

In [None]:
# Notice we have a Job object. The processes in the job
# are available as j.processes
j.duration, j.cpu_time, j.exitcode, j.tags

In [None]:
# first we ask for the aggregate metrics for single job
# Here, we don't specify any tags. For single jobs, when
# we don't specify the operation/tags, they are queried from the job
op_sums = eq.get_op_metrics(jobs=jobid_choice_1, fmt='pandas')
display(op_sums.columns.values)
# let's see the first 10 rows
op_sums[['jobid', 'tags', 'duration', 'cpu_time']][:10]

## <a name="modify-job-metadata">Modifying Job Metadata</a>

Every job stores metadata such as job name, `jobid`, `tags`. Most metadata
fields are not designed to be mutable. However, there are fields such
as `annotations` that are permitted to be modfied using the API.

### <a name="annotate-jobs">Annotate Jobs</a>

Job annotations allow you to store arbitrary key/value pairs
in a persistent manner. They may be retrieved later, modified, and
saved again. There is no semantic meaning associated with annotations
other than what the user ascribes to them. 

In [None]:
eq.get_job_annotations(jobid_choice_1)

In [None]:
eq.annotate_job(jobid_choice_1, {'abc': 100})

In [None]:
eq.get_job_annotations(jobid_choice_1)

In [None]:
eq.annotate_job(jobid_choice_1, {'def': 200})

In [None]:
eq.get_job_annotations(jobid_choice_1)

You will notice that `annotate_job` *merges* (or *updates*) the dictionary. 
It doesn't remove existing keys unless you overwrite them with a new key.

In [None]:
eq.annotate_job(jobid_choice_1, {'abc': 500})

If you wish to replace the existing annotations completely, you should
set the `replace` flag to `annotate_job`.

In [None]:
eq.annotate_job(jobid_choice_1, {'my_key': "something new"}, replace = True)

In [None]:
eq.get_job_annotations(jobid_choice_1)

To remove all annotations:

In [None]:
eq.remove_job_annotations(jobid_choice_1)

In [None]:
eq.get_job_annotations(jobid_choice_1)

### <a name="analyze-jobs">Analyze Jobs</a>

We have developed API calls to run simple analyses pipelines, such
as outlier detection on jobs. The output from such analyses is 
persisted a dictionary along with the job in the database, and
can be retrieved later. 

Please note that the current analyses
pipeline is simplistic and will be improved. It is meant solely
for illustrative purposes and to facilitate feedback to spur
subsequent development.

In [None]:
# request list of *all* unanalyzed jobs
eq.get_unanalyzed_jobs()

In [None]:
# usually we care about a subset of recent jobs rather than
# querying the whole database
eq.get_unanalyzed_jobs(['625172', '627922',jobid_choice_1,'633144'])

You may also care about a specific analysis, such as outlier detection
in which case you can pass an `analyses_filter` to define what an 
"analyzed" job means. We will cover this later in an example once we
have some jobs that we have analyzed.

First let's run an analysis pipeline on some jobs:

In [None]:
# here we pass a list of jobs we want to analyze. If don't pass
# an argument (or an empty list) all pending jobs will be analyzed.
# That could take very long. So here we retrict the active set..
num_filters_executed = eq.analyze_pending_jobs(['625172', '627922', jobid_choice_1, '633144'])

`analyze_pending_jobs` returns the number of analysis methods executed on the
jobs. At present we only have `outlier_detection` in our pipeline. So the
`num_filters_executed` will be `1`. If you run the the same call again, the 
outlier detection algorithm will not be run again. Now, let's see what 
results came from our analysis.

In [None]:
eq.get_job_analyses('625172')

Since we ran outlier detection on 4 jobs, each of the jobs has the same
analyses saved. We queried one of the jobs. We could have queried one
of the other jobs and got the same output.

In [None]:
eq.get_job_analyses('627922')

The results suggest that based on each of the features -- `num_procs`,
`duration`, and `cpu_time` -- job `625172` is an outlier, while
the other three jobs `['633144', jobid_choice_1, '627922']` form an 
equivalent set.

Now if we query the list of unanalyzed jobs, these 4 jobs will be
absent.

In [None]:
eq.get_unanalyzed_jobs()

## <a name="delete-jobs">Deleting Jobs</a>

In [None]:
len(eq.get_jobs(fmt='terse'))

In [None]:
# to delete a single job
# the function returns the number of jobs deleted
eq.delete_jobs('804285')

In [None]:
# to delete multiple jobs, you need to set a second `force` argument
eq.delete_jobs(['693147','696127'], force=True)

### <a name="delete-jobs-time-based">Time-based Job Deletion</a>

In [None]:
# to delete all jobs older than 30 days
# Very useful in a cron job!
eq.delete_jobs([], force=True, before=-30)

In [None]:
# to delete jobs that were executed in the last week
eq.delete_jobs([], force=True, after=-7)

In [None]:
# to delete jobs older than a specific date
# delete jobs before Jan 21, 2018 09:55
eq.delete_jobs([], force=True, before='01/21/2018 09:55')