In [4]:
import os.path

import pandas as pd
import numpy as np
import random

In [5]:
DATA_DIR = "/hydra/datasets"
# 从给定的csv文件中加载数据，并返回pandas数据帧DataFrame。
def get_df(file, header=None):
    df = pd.read_csv(file, header=None)
    # df.columns = DF_HEADER.get(key, df.columns)
    df.columns = pd.read_csv("{}.header".format(file.split('.csv')[0])).columns if header is None else header
    return df

In [6]:
def get_df_one_inst(dft, dfj):
    # 过滤掉太短的任务，它们有可能是inference任务。 返回筛选和处理后的数据帧df_target.
    min_run_time = 1000.
    # 合并job_name相同的dfa和dfj，且仅保留具有相同job_name的行，对于不相同的默认舍弃。
    dfa = dft.merge(dfj, on=['job_name'], suffixes = ['','_j'])
    dfa.loc[dfa.start_time==0, 'start_time'] = np.nan
    # 将start_time==0的行的'start_time'和'end_time'设置为NaN。
    dfa.loc[dfa.start_time==0, 'end_time'] = np.nan
    # 计算差值，计算任务运行时间。
    dfa['runtime'] = dfa.end_time - dfa.start_time
    # 筛选出目标任务数据行，状态为Terminated，GPU类型不为MISC，计划使用的GPU数量为100，实例数量为1.
    df_target = dfa[
        (dfa['status'] == 'Terminated') &
        (dfa['gpu_type'] != 'MISC') &
        (dfa['plan_gpu'] == 100.0) &
        (dfa['inst_num'] == 1.0) &
        (dfa['end_time'] - dfa['start_time'] >= min_run_time) &
        (dfa['task_name'].isin(['tensorflow', 'PyTorchWorker', 'worker']))]
    # 将筛选后的数据帧按start_time_j排序，并计算每个任务与第一个任务的开始时间差值，作为提交时间。
    df_target = df_target.sort_values(['start_time_j'])
    df_target['norm_job_submit_time'] = df_target.start_time_j - df_target.iloc[0]['start_time_j'].item()
    # 去除重复行，只保留第一次出现。
    df_target = df_target.drop_duplicates(['job_name'], keep='first')
    # 打印不同GPU类型的行数
    #print("all", df_target.shape)
    #print("GPU types:", df_target['gpu_type'].unique())
    print("V100 shape", df_target[df_target['gpu_type'] == 'V100M32'].shape)
    #print("V100 shape2", df_target[df_target['gpu_type'] == 'V100'].shape)
    #print("P100 shape", df_target[df_target['gpu_type'] == 'P100'].shape)
    print("T4 shape", df_target[df_target['gpu_type'] == 'T4'].shape)
    # 将V100M32类型替换为V100，
    df_target.loc[df_target.gpu_type == 'V100M32', 'gpu_type'] = 'V100'
    #print("GPU types:", df_target['gpu_type'].unique())
    return df_target

In [7]:
def gen_ddl_and_gpu_runtimes(df_one_inst):
    # 生成一个新的DataFrame，包含每个任务在不同类型的GPU上的运行时间和截止时间。
    gpu_types = [str(t) for t in df_one_inst['gpu_type'].unique()]
    """
    configs
    """
    # 不同GPU的运行时间比例，将任务的标准运行时间转换为特定GPU上的运行时间。
    runtimes = {
        'T4': (1, 1),
        'P100': (1.4, 2.),
        'V100': (2.4, 2.66),
    }
    # 检查runtimes字典中的键是否与gpu_types中的GPU类型匹配。如果不匹配，程序将抛出一个异常。
    assert set(runtimes.keys()) == set(list(gpu_types))
    ddl_ratio = 10 # 截止时间比例
    ddl_range = (1.2, 3.0) # 截止时间范围
    jobs_count = 5000 # 任务数量
    submit_together = True # 任务是否可以一起提交。
    df_one_inst = df_one_inst[['job_name', 'gpu_type', 'runtime', 'norm_job_submit_time']]
    df_one_inst = df_one_inst[:jobs_count]
    # 选取特定的列和行数。
    print(df_one_inst.shape)
    # 打印前100行中的所有列
    print("print", df_one_inst.iloc[0:100, :])
    """
    end configs
    """
    # 用于生成一个任务从from_gpu类型的GPU转到to_gpu类型的GPU时的运行时间。
    def gen_runtime(from_gpu, to_gpu, origin_runtime):
        if from_gpu == to_gpu:
            return origin_runtime
        if from_gpu not in gpu_types:
            print("not in gpu_types:", from_gpu)
        to_rand = random.uniform(*runtimes[to_gpu])
        from_rand = random.uniform(*runtimes[from_gpu])
        return int(origin_runtime * to_rand / from_rand)
    # 用于生成一个基于任务提交时间、运行时间和随机因子的截止时间。
    def gen_ddl(norm_submit_time, runtime):
        if random.randint(0, 100) < ddl_ratio:
            return int(norm_submit_time + runtime * random.uniform(*ddl_range))
        return np.inf
    # 将原始GPU类型映射到新类型。
    fix_gpu = {
        'T4': 'A100',
        'P100': 'GTX2080Ti',
        'V100': 'V100'
    }
    for gpu_type in gpu_types:
        # df.loc[row_labels, column_labels]按标签选择行/列
        df_one_inst.loc[:, fix_gpu[gpu_type]] = df_one_inst.apply(lambda x: gen_runtime(x.gpu_type, gpu_type, x.runtime), axis=1)
        df_one_inst.loc[:, 'ddl'] = df_one_inst.apply(lambda x: gen_ddl(x.norm_job_submit_time, x.runtime), axis=1)
        # axis=1表示按行运行函数，axis=0表示按列运行函数。

    if submit_together:
        df_one_inst = df_one_inst.iloc[np.random.permutation(len(df_one_inst))]
        # 同时提交，则norm_job_submit_time实际值应为0，故应调整ddl的值为ddl-norm_job_submit_time。
        df_one_inst.loc[:, 'ddl'] = df_one_inst.apply(lambda x: x['ddl'] - x['norm_job_submit_time'], axis=1)
        # 因为是同时提交作业，故norm_job_submit_time应设置为0.
        df_one_inst.loc[:, 'norm_job_submit_time'] = df_one_inst.apply(lambda x: 0, axis=1)
    # 修改后的GPU列表类型
    fix_gpu_types = [fix_gpu[t] for t in gpu_types]
    df_output = df_one_inst[['job_name', 'norm_job_submit_time', 'ddl', *fix_gpu_types]]
    # print("output: ", df_output.shape)
    to_csv(df_output, f"case_{jobs_count}_all_{ddl_ratio}_ddl.csv")

In [8]:
def to_csv(df, name):
    df = df.reset_index(drop=True)
    df.to_csv(name)

In [9]:
def main():
    dft = get_df(os.path.join(DATA_DIR, "pai_task_table.csv"))
    dfj = get_df(os.path.join(DATA_DIR, "pai_job_table.csv"))
    df = get_df_one_inst(dft, dfj)
    print(df.head())
    # gen_ddl_and_gpu_runtimes(df)

if __name__ == '__main__':
    main()

V100 shape (3603, 17)
T4 shape (10988, 17)
                        job_name      task_name  inst_num      status  \
273324  34b3d819023ea21e28afd50f     tensorflow       1.0  Terminated   
200005  af3b0f5e810838d6c33840e1     tensorflow       1.0  Terminated   
88185   8b3581e39780f46e28e48c4a  PyTorchWorker       1.0  Terminated   
296351  e3789c5e546bbbe82007127f  PyTorchWorker       1.0  Terminated   
388408  8dc5b00629f823cf8d30cade  PyTorchWorker       1.0  Terminated   

        start_time   end_time  plan_cpu   plan_mem  plan_gpu gpu_type  \
273324    925461.0  1101832.0     600.0  29.296875     100.0       T4   
200005    991185.0  1101105.0     900.0  29.296875     100.0       T4   
88185     991707.0  1232078.0    1800.0  58.593750     100.0     P100   
296351    991774.0  1224062.0    1800.0  58.593750     100.0     P100   
388408    991846.0  1149509.0    1800.0  58.593750     100.0     P100   

                                                  inst_id          user  \
2733

In [49]:

dft = get_df(os.path.join(DATA_DIR, "pai_task_table.csv"))
dfj = get_df(os.path.join(DATA_DIR, "pai_job_table.csv"))
df_one_inst = get_df_one_inst(dft, dfj)

(6278, 17)


In [50]:
gen_ddl_and_gpu_runtimes(df_one_inst)

Empty DataFrame
Columns: [job_name, gpu_type, runtime, norm_job_submit_time]
Index: []
Empty DataFrame
Columns: [job_name, norm_job_submit_time, ddl, T4, P100, V100]
Index: []
