# Used packages and general settings

In [None]:
import re
import datetime
import csv
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from elasticsearch import Elasticsearch

%matplotlib inline

# Elasticsearch configuration

In [None]:
username = "username"
password = "password"
es = Elasticsearch([{"host": "es-cms.cern.ch", "port": 9203, "http_auth": username + ":" + password}], use_ssl=True, verify_certs=True, ca_certs="ca-bundle.trust.crt")

# Time filter

In [None]:
def time_filter(days=0, until=0):
    indices = es.cat.indices(index="cms-20*", h="index", request_timeout=600).split("\n")
    indices = sorted(indices)
    indices = [x for x in indices if x != ""]
    if days == 0:
        return ["cms-20*"]
    today = datetime.date.today()
    filtered = []
    datefmt = "%Y-%m-%d"
    for i in indices:
        date = re.sub(r"cms-", "", i).rstrip()
        date = datetime.datetime.strptime(date, datefmt).date()
        diff = today - date
        if until <= diff.days < days + until:
            filtered.append(i.rstrip())
    return filtered

# Indices to be considered

In [None]:
no_of_days = 0
last_day = 0
ind = time_filter(no_of_days, last_day)
ind = ",".join(ind)

# Part 1

For all successful jobs (ExitCode=0) which have a positive EventRate, make a 2D plot of the CPU efficiency (CPUTimeHr/CommittedCoreHr) versus the throughput (EventRate).

## Query (2017 01 01 - 2017 07 01)

In [None]:
body_until_2017_07_01 = {
    "size": 10000,
    "_source": ["CpuTimeHr", "CommittedCoreHr", "EventRate", "MyEff"],
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "Status": "Completed"
                    }
                },
                {
                    "match": {
                        "ExitCode": 0
                    }
                },
                {
                    "range": {
                        "CpuTimeHr": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "CommittedCoreHr": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "EventRate": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "RecordTime": {
                            "gte": 1483228800000,
                            "lte": 1498860000000,
                            "format": "epoch_millis"
                        }
                    }
                }
            ]
        }
    },
    "script_fields": {
        "MyEff": {
            "script": "doc['CpuTimeHr'].value / doc['CommittedCoreHr'].value"
        }
    }
}

res_until_2017_07_01 = es.search(index=ind, body=body_until_2017_07_01, request_timeout=1200)

## Query (2017 07 01 - today)

In [None]:
body_from_2017_07_01 = {
    "size": 10000,
    "_source": ["CpuTimeHr", "CommittedCoreHr", "EventRate", "MyEff"],
    "query": {
        "bool": {
            "must": [
                {
                    "match": {
                        "Status": "Completed"
                    }
                },
                {
                    "match": {
                        "ExitCode": 0
                    }
                },
                {
                    "range": {
                        "CpuTimeHr": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "CommittedCoreHr": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "EventRate": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "RecordTime": {
                            "gte": 1498860000000,
                            "format": "epoch_millis"
                        }
                    }
                }
            ]
        }
    },
    "script_fields": {
        "MyEff": {
            "script": "doc['CpuTimeHr'].value / doc['CommittedCoreHr'].value"
        }
    }
}

res_from_2017_07_01 = es.search(index=ind, body=body_from_2017_07_01, request_timeout=1200)

## Function for listing the CPU efficiency and event rate

In [None]:
def listing_cpu_eff_and_event_rate(result):
    list_of_event_rate = []
    list_of_cpu_eff = []
    records = result["hits"]["hits"]
    for record in records:
        list_of_event_rate.append(record["_source"]["EventRate"])
        list_of_cpu_eff.append(record["fields"]["MyEff"])
    return list_of_event_rate, list_of_cpu_eff

## Listing the CPU efficiency and event rate

In [None]:
event_rate_until, cpu_eff_until = listing_cpu_eff_and_event_rate(res_until_2017_07_01)
event_rate_from, cpu_eff_from = listing_cpu_eff_and_event_rate(res_from_2017_07_01)

## Graph of CPU efficiency and event rate

In [None]:
def correlation_graph(list_of_cpu_eff, list_of_event_rate, title):
    y_max = sorted(list_of_event_rate)[-200:-199][0]
    for i in range(0, 2):
        plt.rcParams['figure.figsize'] = (20, 4)
        plt.rcParams.update({"font.size": 15})
        if i == 1:
            plt.yscale("log")
            plt.ylabel("Event rate (log)")
            plt.title(title + " (logarithmic scale)")
        else:
            plt.ylabel("Event rate")
            plt.title(title + " (linear scale)")
            plt.ylim([0, y_max])
        plt.scatter(list_of_cpu_eff, list_of_event_rate)
        plt.xlabel("CPU efficiency")
        plt.xlim([0.0, 1.0])
        plt.show()

## Graphs of CPU efficiency and event rate

In [None]:
correlation_graph(cpu_eff_until, event_rate_until, "2017 01 01 - 2017 07 01")
correlation_graph(cpu_eff_from, event_rate_until, "2017 07 01 - today")

# Part 2

Do as above but separately for the most important TaskType values.

## Considered task types

In [None]:
task_types = ["GENSIM", "DataProcessing", "RECO", "MINIAOD", "Merge", "DIGI"]

## Queries

In [None]:
res = {}
for task_type in task_types:
    body = {
        "size": 10000,
        "_source": ["CpuTimeHr", "CommittedCoreHr", "EventRate", "MyEff"],
        "query": {
            "bool": {
                "must": [
                    {
                        "match": {
                            "Status": "Completed"
                        }
                    },
                    {
                        "term": {
                            "TaskType": task_type
                        }
                    },
                    {
                        "match": {
                            "ExitCode": 0
                        }
                    },
                    {
                        "range": {
                            "CpuTimeHr": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "CommittedCoreHr": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "EventRate": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "RecordTime": {
                                "gte": 1498860000000,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ]
            }
        },
        "script_fields": {
            "MyEff": {
                "script": "doc['CpuTimeHr'].value / doc['CommittedCoreHr'].value"
            }
        }
    }
    res[task_type] = es.search(index=ind, body=body, request_timeout=1200)

## Graphs of CPU efficiency and event rate

In [None]:
for key, value in res.iteritems():
    event_rate, cpu_eff = listing_cpu_eff_and_event_rate(value)
    correlation_graph(cpu_eff, event_rate, key)

# Part 3

For each important TaskType, find a subtask (a set of jobs sharing the same value for the WMAgent_SubTaskName attribute) with a very large number of jobs (for example, the one with the most jobs) and generate the 2D plot for those jobs.

## Queries for finding a subtask with a very large number of jobs

In [None]:
res = {}
for task_type in task_types:
    body = {
        "size": 0,
        "_source": ["CpuTimeHr", "CommittedCoreHr", "EventRate", "MyEff", "WMAgent_SubTaskName"],
        "query": {
            "bool": {
                "must": [
                    {
                        "match": {
                            "Status": "Completed"
                        }
                    },
                    {
                        "term": {
                            "TaskType": task_type
                        }
                    },
                    {
                        "match": {
                            "ExitCode": 0
                        }
                    },
                    {
                        "range": {
                            "CpuTimeHr": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "CommittedCoreHr": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "EventRate": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "RecordTime": {
                                "gte": 1498860000000,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ]
            }
        },
        "script_fields": {
            "MyEff": {
                "script": "doc['CpuTimeHr'].value / doc['CommittedCoreHr'].value"
            }
        },
        "aggs": {
            "TaskType": {
                "terms": {
                    "field": "TaskType"
                },
                "aggs": {
                    "WMAgent_SubTaskName": {
                        "terms": {
                            "field": "WMAgent_SubTaskName",
                            "size": 1
                        }
                    }
                }
            }
        }
    }
    res[task_type] = es.search(index=ind, body=body, request_timeout=1200)

## Considered subtask of considered task types

In [None]:
subtask = {}
for key, value in res.iteritems():
    subtask[key] = value["aggregations"]["TaskType"]["buckets"][0]["WMAgent_SubTaskName"]["buckets"][0]["key"]

## Queries for finding CPU efficiency and event rate

In [None]:
res = {}
for key, value in subtask.iteritems():
    body = {
        "size": 10000,
        "_source": ["CpuTimeHr", "CommittedCoreHr", "EventRate", "MyEff"],
        "query": {
            "bool": {
                "must": [
                    {
                        "match": {
                            "Status": "Completed"
                        }
                    },
                    {
                        "term": {
                            "TaskType": key
                        }
                    },
                    {
                        "term": {
                            "WMAgent_SubTaskName": value
                        }
                    },
                    {
                        "match": {
                            "ExitCode": 0
                        }
                    },
                    {
                        "range": {
                            "CpuTimeHr": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "CommittedCoreHr": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "EventRate": {
                                "gt": 0
                            }
                        }
                    },
                    {
                        "range": {
                            "RecordTime": {
                                "gte": 1498860000000,
                                "format": "epoch_millis"
                            }
                        }
                    }
                ]
            }
        },
        "script_fields": {
            "MyEff": {
                "script": "doc['CpuTimeHr'].value / doc['CommittedCoreHr'].value"
            }
        }
    }
    res[key] = es.search(index=ind, body=body, request_timeout=1200)

## Graphs of CPU efficiency and event rate

In [None]:
for key, value in res.iteritems():
    event_rate, cpu_eff = listing_cpu_eff_and_event_rate(value)
    correlation_graph(cpu_eff, event_rate, "The largest subtask of " + key + " jobs")

# Part 4

Another task, after working on the plots, could be to add this aggregation to the ES query, so that the correlation can be calculated by ES for a much larger sample of jobs with respect to what we can do in a notebook. To plot distribution of correlation, we can weigh the correlations with the cumulative CoreHr of the jobs in the bucket: this is to avoid giving the same weight to small samples of short jobs and to large samples of long jobs.

## Query

In [None]:
body_for_production_jobs = {
    "size": 0,
    "query": {
        "bool": {
            "filter": [
                {
                    "match": {
                        "Type": "production"
                    }
                },
                {
                    "match": {
                        "ExitCode": 0
                    }
                },
                {
                    "match": {
                        "Status": "Completed"
                    }
                },
                {
                    "range": {
                        "CommittedCoreHr": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "CpuTimeHr": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "EventRate": {
                            "gt": 0
                        }
                    }
                },
                {
                    "range": {
                        "CpuEff": {
                            "gte": 0,
                            "lte": 1
                        }
                    }
                }
            ]
        }
    },
    "aggs": {
        "ttypes": {
            "terms": {
                "field": "TaskType",
                "size": 1000
            },
            "aggs": {
                "site": {
                    "terms": {
                        "field": "Site",
                        "size": 1000
                    },
                    "aggs": {
                        "wf": {
                            "terms": {
                                "field": "WMAgent_SubTaskName",
                                "size": 100000
                            },
                            "aggs": {
                                "type": {
                                    "terms": {
                                        "field": "Type",
                                        "size": 10
                                    },
                                    "aggs": {
                                        "exitcode": {
                                            "terms": {
                                                "field": "ExitCode",
                                                "size": 1000
                                            },
                                            "aggs": {
                                                "loc": {
                                                    "terms": {
                                                        "field": "InputData",
                                                        "size": 10
                                                    },
                                                    "aggs": {
                                                        "ex_job": {
                                                            "top_hits": {
                                                                "sort": [
                                                                    {
                                                                        "CompletionDate": {
                                                                            "order": "desc"
                                                                        }
                                                                    }
                                                                ],
                                                                "_source": {
                                                                    "includes": [
                                                                        "WMAgent_JobID",
                                                                        "GlobalJobId"
                                                                    ]
                                                                },
                                                                "size": 1
                                                            }
                                                        },
                                                        "3": {
                                                            "stats": {
                                                                "field": "CpuTimeHr"
                                                            }
                                                        },
                                                        "4": {
                                                            "stats": {
                                                                "field": "CommittedCoreHr"
                                                            }
                                                        },
                                                        "5": {
                                                            "stats": {
                                                                "field": "CoreHr"
                                                            }
                                                        },
                                                        "6": {
                                                            "avg": {
                                                                "field": "RequestCpus"
                                                            }
                                                        },
                                                        "7": {
                                                            "avg": {
                                                                "script": {
                                                                    "inline": "((doc['CommittedCoreHr'].value > 0) ? doc['CpuTimeHr'].value / doc['CommittedCoreHr'].value : 0)",
                                                                    "lang": "painless"
                                                                }
                                                            }
                                                        },
                                                        "8": {
                                                            "avg": {
                                                                "field": "CpuEff"
                                                            }
                                                        },
                                                        "16": {
                                                            "avg": {
                                                                "field": "CompletionDate"
                                                            }
                                                        },
                                                        "17": {
                                                            "stats": {
                                                                "field": "RemoteWallClockTime"
                                                            }
                                                        },
                                                        "matrixstats": {
                                                            "matrix_stats": {
                                                                "fields": ["EventRate", "CpuEff"]
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

result_for_production_jobs = es.search(index=ind, body=body_for_production_jobs, request_timeout=12000)

In [None]:
body_for_analysis_jobs = {
    "size": 0,
    "query": {
        "bool": {
            "filter": [
                {
                    "match": {
                        "Type": "analysis"
                    }
                },
                {
                    "exists": {
                        "field": "ExitCode"
                    }
                },
                {
                    "match": {
                        "Status": "Completed"
                    }
                },
                {
                    "range": {
                        "CommittedCoreHr": {
                            "gt": 0
                        }
                    }
                }
            ]
        }
    },
    "aggs": {
        "ttypes": {
            "terms": {
                "field": "TaskType",
                "size": 10
            },
            "aggs": {
                "site": {
                    "terms": {
                        "field": "Site",
                        "size": 10
                    },
                    "aggs": {
                        "wf": {
                            "terms": {
                                "field": "CRAB_Workflow",
                                "size": 10
                            },
                            "aggs": {
                                "type": {
                                    "terms": {
                                        "field": "Type",
                                        "size": 10
                                    },
                                    "aggs": {
                                        "exitcode": {
                                            "terms": {
                                                "field": "ExitCode",
                                                "size": 10
                                            },
                                            "aggs": {
                                                "loc": {
                                                    "terms": {
                                                        "field": "InputData",
                                                        "size": 10
                                                    },
                                                    "aggs": {
                                                        "3": {
                                                            "stats": {
                                                                "field": "CpuTimeHr"
                                                            }
                                                        },
                                                        "4": {
                                                            "stats": {
                                                                "field": "CommittedCoreHr"
                                                            }
                                                        },
                                                        "5": {
                                                            "stats": {
                                                                "field": "CoreHr"
                                                            }
                                                        },
                                                        "6": {
                                                            "avg": {
                                                                "field": "RequestCpus"
                                                            }
                                                        },
                                                        "7": {
                                                            "avg": {
                                                                "script": {
                                                                    "inline": "((doc['CommittedCoreHr'].value > 0) ? doc['CpuTimeHr'].value / doc['CommittedCoreHr'].value : 0)",
                                                                    "lang": "painless"
                                                                }
                                                            }
                                                        },
                                                        "8": {
                                                            "avg": {
                                                                "field": "CpuEff"
                                                            }
                                                        },
                                                        "16": {
                                                            "avg": {
                                                                "field": "CompletionDate"
                                                            }
                                                        },
                                                        "17": {
                                                            "stats": {
                                                                "field": "RemoteWallClockTime"
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

result_for_analysis_jobs = es.search(index=ind, body=body_for_analysis_jobs, request_timeout=12000)

## Output file

In [None]:
output_file = "task_cms_ineff_test2.csv"

## Write to output file

In [None]:
names = ["task", "tasktype", "site", "type", "exitcode", "locality", "njobs",
         "cpu", "cpu_avg", "cwc", "cwc_avg", "wc", "wc_avg", "cores", "eff", "reff",
         "date", "rwct", "rwct_avg", "retr_str", "correlation_of_cpu_eff_and_event_rate"]

with open(output_file, "w") as f:
    writer = csv.writer(f, lineterminator="\n")
    writer.writerow(names)
    for res in (result_for_production_jobs, result_for_analysis_jobs):
        buckets1 = res["aggregations"]["ttypes"]["buckets"]
        for b1 in buckets1:
            ttype = b1["key"]
            buckets2 = b1["site"]["buckets"]
            for b2 in buckets2:
                site = b2["key"]
                buckets3 = b2["wf"]["buckets"]
                for b3 in buckets3:
                    wf = b3["key"]
                    buckets4 = b3["type"]["buckets"]
                    for b4 in buckets4:
                        typ = b4["key"]
                        buckets5 = b4["exitcode"]["buckets"]
                        for b5 in buckets5:
                            exitcode = b5["key"]
                            buckets6 = b5["loc"]["buckets"]
                            for b6 in buckets6:
                                loc = b6["key"]
                                cnt = b6["doc_count"]
                                try:
                                    if typ == "production":
                                        retrieve = r'[["' + wf.split("/")[1] + '","' + b6["ex_job"]["hits"]["hits"][0]["_source"]["GlobalJobId"].split("#")[0] + ":" + str(b6["ex_job"]["hits"]["hits"][0]["_source"]["WMAgent_JobID"]) + '"]]'
                                        correlation = b6["matrixstats"]["fields"][0]["correlation"]["EventRate"]
                                    else:
                                        retrieve = "na"
                                        correlation = "na"
                                    writer.writerow([
                                        wf,
                                        ttype,
                                        site,
                                        typ,
                                        exitcode,
                                        loc,
                                        cnt,
                                        b6["3"]["sum"] * 3600.,
                                        b6["3"]["avg"] * 3600.,
                                        b6["4"]["sum"] * 3600.,
                                        b6["4"]["avg"] * 3600.,
                                        b6["5"]["sum"] * 3600.,
                                        b6["5"]["avg"] * 3600.,
                                        b6["6"]["value"],
                                        b6["7"]["value"] * 100.,
                                        b6["8"]["value"],
                                        b6["16"]["value"],
                                        b6["17"]["sum"],
                                        b6["17"]["avg"],
                                        retrieve,
                                        correlation
                                    ])
                                except:
                                    pass

## Input file

In [None]:
input_file = "task_cms_ineff_test2.csv"

## Read CSV (comma-separated) file into DataFrame

In [None]:
df = pd.read_csv(input_file, dtype={"cpu_n": "float"}, low_memory=True)

## Data preparation

In [None]:
correlation_df = df[["tasktype", "wc", "correlation_of_cpu_eff_and_event_rate"]]
correlation_df = correlation_df[correlation_df.correlation_of_cpu_eff_and_event_rate != "na"].copy()
correlation_df = correlation_df.dropna()
correlation_df[["wc", "correlation_of_cpu_eff_and_event_rate"]] = correlation_df[["wc", "correlation_of_cpu_eff_and_event_rate"]].apply(pd.to_numeric)

## Function for plotting distribution of correlation

In [None]:
def histogram(data, weights, title):
    y_values, _, _ = plt.hist(data, bins=100, range=[-1.0, 1.0], weights=weights)
    plt.title(title)
    plt.ylabel("Frequency")
    plt.xlabel("Correlation")
    plt.text(-0.4, max(y_values) / 3 * 2, r"$\mu=%.2f$" % np.average(data, weights=weights))
    plt.show()

## Distribution of correlation

In [None]:
histogram(correlation_df.correlation_of_cpu_eff_and_event_rate, correlation_df.wc, "All task types")

## Distributions of correlation for task types

In [None]:
task_types = set(correlation_df.tasktype)
for task_type in task_types:
    correlation_by_task_type_df = correlation_df[correlation_df.tasktype == task_type]
    histogram(correlation_by_task_type_df.correlation_of_cpu_eff_and_event_rate, correlation_by_task_type_df.wc, task_type)