## Getting Started

In this notebook, we train a neural network and random forest to predict queue time of jobs based on requested resources. Jobs are queried via `qstat` and then cleaned. The prediction models are trained on a sliding window of jobs, so that predictions are localized in time. Jobs with low queue time are removed to mitigate data imbalance.

In [None]:
import sys
sys.path.append('../bin')

import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import random
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor
import sklearn.metrics
import sklearn.model_selection
from sklearn.neural_network import MLPRegressor
from tensorflow import keras

from train import \
    is_categorical, \
    create_dataset, \
    evaluate_cv

## Create Dataset

In [None]:
!qstat -xf > qstat.table.txt

In [None]:
# load qstat data
qstat_file = open('qstat.table.txt')

# parse qstat data into records
rows = []
job = {}
key = None

for line in qstat_file:
    # remove trailing newline
    line = line.rstrip('\n')

    # parse new job on job header
    if line.startswith('Job Id'):
        job['Job_Id'] = line.split(': ')[1]

    # parse new job attribute on space indent
    elif line.startswith('    '):
        key, value = line.strip().split(' = ')
        job[key] = value

    # parse continued job attribute on tab indent
    elif line.startswith('\t'):
        job[key] += line.strip()

    # append complete job on blank line
    elif line == '':
        rows.append(job)
        job = {}
        key = None

    # append extra text to job attribute if possible
    elif key != None:
        job[key] += line.strip()

    # otherwise print warning
    else:
        print('warning: unable to parse line \'%s\'' % (line))

# create dataframe
df = pd.DataFrame(rows)
df.set_index('Job_Id', inplace=True)

# remove unused columns
columns = [
    'Job_Owner',
    'session_id',
    'queue',
    'Resource_List.nodect',
    'Resource_List.ncpus',
    'Resource_List.mpiprocs',
    'Resource_List.mem',
    'Resource_List.ngpus',
    'Resource_List.nphis',
    'Resource_List.place',
    'Resource_List.qcat',
    'Resource_List.walltime',
    'Resource_List.select',
    'exec_vnode',
    'job_state',
    'ctime',
    'etime',
    'qtime',
    'stime',
    'mtime',
    'resources_used.ncpus',
    'resources_used.cput',
    'resources_used.cpupercent',
    'resources_used.mem',
    'resources_used.vmem',
    'resources_used.walltime'
]

df = df[columns]

# select only running or finished jobs
df = df[(df['job_state'] == 'R') | (df['job_state'] == 'F')]

# convert temporal columns to datetime
for column in ['ctime', 'etime', 'qtime', 'stime', 'mtime']:
    df[column] = pd.to_datetime(df[column], infer_datetime_format=True)  

# convert resource list, resources used columns to numerical
df['Resource_List.mem'] = df['Resource_List.mem'].str.strip('gb').astype(int)
df['Resource_List.walltime'] = df['Resource_List.walltime'].apply(lambda td: pd.Timedelta(td).total_seconds() / 3600)
df['resources_used.walltime'] = df['resources_used.walltime'].apply(lambda td: pd.Timedelta(td).total_seconds())

# compute queue time (minutes)
df['queuetime'] = (df['stime'] - df['qtime']).apply(lambda td: td.total_seconds())
df['queuetime'] /= 60

# remove jobs with missing queue time
df = df[~df['queuetime'].isna()]

# compute walltime (sanity check)
df['walltime']  = (df['mtime'] - df['stime']).apply(lambda td: td.total_seconds())

# sort rows by qtime
df = df.sort_values('qtime')

# save dataframe to file
df.to_csv('qstat.table.txt', sep='\t')

## Load Dataset

In [None]:
df = pd.read_csv('qstat.table.txt', sep='\t', index_col=0)

## Visualize Queue Time

- `ctime`: The time that the job was created.
- `qtime`: The time that the job entered the current queue.
- `etime`: The time that the job became eligible to run, i.e. in a queued state while residing in an execution queue.
- `stime`: Timestamp; time when the job started execution.  Changes when job is restarted.
- `mtime`: The time that the job was last modified, changed state, or changed locations.

In [None]:
# check the time span of the dataset
df['qtime']

In [None]:
# compute summary statistics of queue time
df['queuetime'].describe()

In [None]:
# plot histogram of queue time
data = df['queuetime']
data = data[(10 < data) & (data < 10000)]

plt.figure(figsize=(16, 4))
sns.histplot(data)
plt.show()

## Predict Queue Time

In [None]:
# remove jobs with negligible queue time (reduce data imbalance)
df = df[(10 < df['queuetime']) & (df['queuetime'] < 10000)]

In [None]:
# define sliding window
n = 200
stride = n // 2

# define inputs, target
inputs = [
    'Resource_List.nodect',
    'Resource_List.ncpus',
    'Resource_List.mem',
    'Resource_List.ngpus',
    'Resource_List.walltime'
]
target = 'queuetime'

# define models
models = [
    ('mlp', MLPRegressor(hidden_layer_sizes=(128, 128, 128))),
    ('rf', create_rf(criterion='mse')),
]

# train models at each time window
df_scores = []

for i in range(0, len(df.index) - n, stride):
    print(i)

    # extract window of jobs
    df_window = df.iloc[i:(i + n)]

    # extract performance dataset
    X, y, columns, _ = create_dataset(df_window, inputs, target)

    # visualize queue time vs each input (with feature importances)
    if True:
        rf = create_rf(criterion='mse')
        rf.fit(X, y)

        fig, axes = plt.subplots(1, len(inputs), figsize=(4 * len(inputs), 4))

        for ax, xaxis, importance in zip(axes, inputs, rf.feature_importances_):
            ax.scatter(xaxis, target, data=df_window)
            ax.set_xlabel(xaxis)
            ax.set_ylabel(target)
            ax.set_title('importance = %0.3f' % (importance))

        plt.tight_layout()
        plt.show()

    # evaluate each model on dataset
    y_preds = {}

    for name, model in models:
        scores, y_bar, y_std = evaluate_cv(model, X, y)

        df_scores.append({
            'window_start': i,
            'median': df_window[target].median(),
            'name': name,
            'mae': np.mean(scores['mae']),
            'mpe': np.mean(scores['mpe'])
        })
        y_preds[name] = y_bar

    # plot expected vs predicted target values
    fig, axes = plt.subplots(1, len(models), figsize=(4 * len(models), 4), squeeze=False)

    for (name, model), ax in zip(models, axes.flatten()):
        y_pred = y_preds[name]

        sns.scatterplot(x=y, y=y_pred, ax=ax)
        vmax = max(y.max(), y_pred.max())
        ax.plot([0, vmax], [0, vmax], 'k--')
        ax.set_xlabel(target)
        ax.set_ylabel('%s | %s' % (target, name))

    plt.tight_layout()
    plt.show()

# visualize prediction error over time
df_scores = pd.DataFrame(df_scores)

plt.figure(figsize=(12, 4))
sns.lineplot(x='window_start', y='mae', hue='name', data=df_scores)
xmin, xmax = plt.xlim()
plt.plot(df_scores['window_start'], df_scores['median'], 'r--', label='median')
plt.legend()
plt.show()

plt.figure(figsize=(12, 4))
sns.lineplot(x='window_start', y='mpe', hue='name', data=df_scores)
xmin, xmax = plt.xlim()
plt.plot([xmin, xmax], [100, 100], 'r--', label='100 %')
plt.legend()
plt.show()