In [1]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
from tqdm import tqdm

C:\Users\User\anaconda3\lib\site-packages\numpy\.libs\libopenblas.GK7GX5KEQ4F6UYO3P26ULGBQYHGQO7J4.gfortran-win_amd64.dll
C:\Users\User\anaconda3\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll


In [2]:
def read_swf_file(file_path):
    col_names = ['job_number', 'submit_time', 'wait_time', 'run_time', 'num_procs',
                 'avg_cpu_time', 'used_memory', 'req_procs', 'req_time', 'req_memory',
                 'status', 'user_id', 'group_id', 'exec_id', 'queue_id',
                 'partition_id', 'orig_site', 'last_run_site']
    df = pd.read_csv(file_path, comment=';', header=None, names=col_names, delim_whitespace=True)
    return df

In [101]:
df = read_swf_file('SDSC-SP2-1998-4.2-cln.swf')
df = df.drop(columns=['used_memory', 'req_memory', 'partition_id', 'orig_site', 'last_run_site'])
df = df.drop(columns=['num_procs', 'avg_cpu_time', 'job_number'])

In [120]:
df = df[df.queue_id == 3].drop(columns=['queue_id'])
df.index = np.arange(df.shape[0])
df.run_time = np.where(df.run_time == -1, 0, df.run_time)
df.submit_time = df.submit_time - df.submit_time.min()

In [121]:
user_counts = df.user_id.value_counts()
best_users = user_counts[user_counts > 600].index.values
user_enc = np.zeros(df.shape[0])
for i, us in enumerate(best_users):
    user_enc = np.where(df.user_id == us, i + 1, user_enc)

In [122]:
group_counts = df.group_id.value_counts()
best_groups = group_counts[group_counts > 900].index.values
group_enc = np.zeros(df.shape[0])
for i, g in enumerate(best_groups):
    group_enc = np.where(df.group_id == g, i + 1, group_enc)

In [123]:
exec_counts = df.exec_id.value_counts()
best_execs = exec_counts[exec_counts > 110].index.values
exec_enc = np.zeros(df.shape[0])
for i, g in enumerate(best_execs):
    exec_enc = np.where(df.exec_id == g, i + 1, exec_enc)

In [124]:
df['date_time'] = pd.to_datetime(df.submit_time + 893466664, unit='s')
df['start_time'] = df['submit_time'] + df['wait_time']
df['finish_time'] = df['submit_time'] + df['wait_time'] + df['run_time']
df['user_enc'] = user_enc.astype(int)
df['group_enc'] = group_enc.astype(int)
df['exec_enc'] = exec_enc.astype(int)
df = df.drop(columns=['user_id', 'group_id', 'exec_id'])

df.req_time = np.where(df.req_time < 1, 0, df.req_time)
df['extra_time'] = df.req_time - df.run_time

In [125]:
NUM_USERS = len(set(df.user_enc))
NUM_GROUPS = len(set(df.group_enc))
NUM_EXECS = len(set(df.exec_enc))
user_enc_cols = [f'user_enc_{i}' for i in range(NUM_USERS)]
group_enc_cols = [f'group_enc_{i}' for i in range(NUM_GROUPS)]
exec_enc_cols = [f'exec_enc_{i}' for i in range(NUM_EXECS)]

In [126]:
week_seconds = 3600 * 24 * 7
month_seconds = 3600 * 24 * 30

In [11]:
def get_datasets(idx, df):
    task = df.loc[idx]

    finished_tasks = df[(df.finish_time <= task.submit_time) & (task.submit_time - df.submit_time < week_seconds)]

    waiting_tasks = df[(df.start_time > task.submit_time) & (df.submit_time < task.submit_time)].copy()
    waiting_tasks = waiting_tasks.drop(columns=['run_time', 'wait_time', 'finish_time', 'start_time', 'extra_time', 'status'])
    waiting_tasks['cur_time_wait'] = task.submit_time - df.submit_time

    working_tasks = df[(df.start_time < task.submit_time) & (df.finish_time >= task.submit_time)].copy()
    working_tasks = working_tasks.drop(columns=['run_time', 'finish_time', 'status', 'extra_time'])
    time_working = task.submit_time - working_tasks.start_time
    working_tasks['time_left'] = working_tasks.req_time - time_working

    task = task.loc[['wait_time', 'req_procs', 'req_time', 'user_enc', 'group_enc', 'exec_enc']]
    waiting_features = process_waiting_tasks(waiting_tasks)
    working_features = process_working_tasks(working_tasks)
    finished_features = process_finished_tasks(finished_tasks)
    
    task = pd.DataFrame(task).T
    task.index = [0]
    finished_features = finished_features.rename(lambda s: 'f_' + s, axis='columns')
    waiting_features = waiting_features.rename(lambda s: 'wait_' + s, axis='columns')
    working_features = working_features.rename(lambda s: 'work_' + s, axis='columns')

    return [task, finished_features, waiting_features, working_features]

In [59]:
def count_base_stats(name, data):
    stats = dict()
    stats[f'max_{name}'] = data[name].max()
    stats[f'min_{name}'] = data[name].min()
    stats[f'mean_{name}'] = data[name].mean()
    stats[f'median_{name}'] = data[name].median()
    stats[f'sum_{name}'] = data[name].sum()
    return stats

def count_cat_features(name, data):
    features = dict()
    counts = data[name].value_counts()
    for i in counts.index:
        features[f'{name}_{i}'] = counts[i]
    return features

In [60]:
def process_waiting_tasks(waiting_tasks):

    features = dict.fromkeys(user_enc_cols + group_enc_cols + exec_enc_cols, 0)
    features['number'] = waiting_tasks.shape[0]

    features.update(count_cat_features('user_enc', waiting_tasks))
    features.update(count_cat_features('group_enc', waiting_tasks))
    features.update(count_cat_features('exec_enc', waiting_tasks))

    features.update(count_base_stats('cur_time_wait', waiting_tasks))
    features.update(count_base_stats('req_time', waiting_tasks))
    features.update(count_base_stats('req_procs', waiting_tasks))

    for k in features:
        features[k] = [features[k]]

    return pd.DataFrame(features)

def process_working_tasks(working_tasks):

    features = dict.fromkeys(user_enc_cols + group_enc_cols + exec_enc_cols, 0)
    features['number'] = working_tasks.shape[0]

    features.update(count_base_stats('time_left', working_tasks))
    features.update(count_base_stats('wait_time', working_tasks))
    features.update(count_base_stats('req_time', working_tasks))
    features.update(count_base_stats('req_procs', working_tasks))

    features.update(count_cat_features('user_enc', working_tasks))
    features.update(count_cat_features('group_enc', working_tasks))
    features.update(count_cat_features('exec_enc', working_tasks))
        
    for k in features:
        features[k] = [features[k]]

    return pd.DataFrame(features)

def process_finished_tasks(finished_tasks):

    features = dict()
    features['number'] = finished_tasks.shape[0]
    features[f'mean_wait_time'] = finished_tasks['wait_time'].mean()
    features[f'median_wait_time'] = finished_tasks['wait_time'].median()
        
    for k in features:
        features[k] = [features[k]]

    return pd.DataFrame(features)

In [97]:
def create_all_features(idx, df=df):
    datasets = get_datasets(idx, df)
    all_columns = [c for d in datasets for c in d.columns]
    all_features = pd.concat(datasets, axis=1, ignore_index=True)
    all_features.columns = all_columns
    return all_features

In [88]:
thresold = 1830008

In [62]:
features = []
for i in tqdm(df[23:].index):
    features.append(create_all_features(i))
features = pd.concat(features)
features.index = df[23:].index

100%|██████████| 30118/30118 [06:51<00:00, 73.26it/s]


In [64]:
features.to_pickle('features.pkl')

In [96]:
mask = df.wait_time < thresold

In [98]:
features_2 = []
for i in tqdm(df[mask][50:].index):
    features_2.append(create_all_features(i, df[mask]))
features_2 = pd.concat(features_2)
features_2.index = df[mask][50:].index
features_2.to_pickle('features_2.pkl')

100%|██████████| 30061/30061 [14:48<00:00, 33.83it/s]


In [99]:
# plt.figure(figsize=(20, 5))
# num = 1000
# plt.bar(np.arange(num), sorted(df.wait_time)[-num:])
# plt.xlim([-0.5, num - 0.5])