In [49]:
import pandas as pd
import math
import numpy as np
import matplotlib.pyplot as plt
import warnings
import json
import scipy.stats as stats
import random
import string

warnings.filterwarnings("ignore")

cluster = 'uranus'
raw_data = pd.read_csv(f"./data/{cluster}/cluster_log.csv")
num_cpus = {'venus': 48, 'uranus': 64, 'earth': 48, 'saturn': 64}
raw_data.head()

Unnamed: 0,job_id,user,vc,gpu_num,cpu_num,node_num,state,submit_time,start_time,end_time,duration,queue
0,1380066,uVCWx,vcrV2,1,1,1,TIMEOUT,2020-03-22 10:38:56,2020-03-22 10:38:56,2020-04-05 10:39:21,1209625,0
1,1381347,ulqKP,vcG8f,8,1,1,CANCELLED,2020-03-23 01:25:21,2020-03-23 01:27:20,2020-04-01 17:34:54,835654,119
2,1381352,ulqKP,vcG8f,8,1,1,CANCELLED,2020-03-23 01:32:22,2020-03-23 01:34:04,2020-04-01 18:06:35,837151,102
3,1381353,ulqKP,vcG8f,8,1,1,CANCELLED,2020-03-23 01:35:13,2020-03-23 01:35:13,2020-04-01 18:06:43,837090,0
4,1381358,ulqKP,vcG8f,8,1,1,CANCELLED,2020-03-23 01:36:42,2020-03-23 01:36:42,2020-04-01 18:06:47,837005,0


In [50]:
# clean data
data = raw_data[(raw_data['state'] == "COMPLETED") & (raw_data['duration'] > 60) & raw_data['gpu_num'] > 0]
data['submit_time'] = pd.to_datetime(data["submit_time"])
data.sort_values(by="submit_time", inplace=True)
data = data[['job_id', 'user', 'gpu_num', 'cpu_num', 'submit_time', 'start_time', 'end_time', 'duration', 'queue']]
data.sort_values(by='duration')
data.head()

Unnamed: 0,job_id,user,gpu_num,cpu_num,submit_time,start_time,end_time,duration,queue
77,1398486,uA4Ms,1,1,2020-03-30 12:57:12,2020-03-30 12:57:12,2020-04-01 14:04:36,176844,0
91,1400611,uA4Ms,1,1,2020-03-30 20:09:43,2020-03-30 20:09:43,2020-04-01 02:52:09,110546,0
105,1402457,uA4Ms,1,1,2020-03-31 00:55:37,2020-03-31 02:11:16,2020-04-01 15:14:57,133421,4539
107,1402561,uA4Ms,1,1,2020-03-31 01:17:42,2020-03-31 02:15:17,2020-04-02 07:51:30,192973,3455
119,1403830,u0jb4,1,1,2020-03-31 10:44:34,2020-03-31 10:44:34,2020-04-01 08:00:29,76555,0


In [51]:
"""
使用任务持续时间作为指数分布的λ参数是基于一种假设，即任务的持续时间与数据集大小之间存在某种关联。这个假设源自于一个观察，即在深度学习任务中，处理大的数据集通常需要更长的时间。

通过将任务持续时间映射为λ参数，我们实际上是在模拟一个概率分布，该分布描述了对于一个给定的任务持续时间，数据集的大小可能会有多大。由于指数分布是一种单调递减的分布，因此对于较长的任务持续时间，它倾向于生成较大的数据集大小，这符合我们的直觉和观察。

然而，这仍然是一个简化的假设，并不能完全捕捉到实际中任务持续时间与数据集大小之间可能存在的复杂关系。在实际的情况中，任务的持续时间可能受到很多其他因素的影响，比如任务的复杂性、硬件设施的性能、网络带宽等等。
"""

durations = []
for i, row in data.iterrows():
    if row['gpu_num'] == 0:
        durations.append(row['duration'] / row['cpu_num'])
    else:
        durations.append(row['duration'] / row['gpu_num'])

# 对duration进行归一化处理
min_duration = np.min(durations)
max_duration = np.max(durations)
dataset_size_range = np.arange(10, 1010, 10)
simulated_size = []

for dur in durations:
    # 首先，我们需要将duration和dataset size都归一化到0-1的范围
    duration = (dur - min_duration) / (max_duration - min_duration)
    norm_dataset_size_range = (dataset_size_range - np.min(dataset_size_range)) / (np.max(dataset_size_range) - np.min(dataset_size_range))

    # 然后，我们使用归一化后的duration作为指数分布的λ参数
    lambda_param = duration

    # 生成对应的概率分布
    prob_distribution = np.exp(-lambda_param * norm_dataset_size_range)

    # 由于这个概率分布并不是一个概率密度函数，我们需要对其进行归一化
    prob_distribution = prob_distribution / np.sum(prob_distribution)

    # 最后，我们根据这个概率分布在dataset size范围内进行随机抽样，得到模拟的dataset size
    s = np.random.choice(norm_dataset_size_range, p=prob_distribution)
    
    simulated_size.append(s * ((np.max(dataset_size_range) - np.min(dataset_size_range))) + np.min(dataset_size_range))

datasources = []
for dataset_size in simulated_size:
    chunks = []
    chunk_size = dataset_size // 10
    s = 0
    while s < dataset_size:
        chunks.append({
            'ETag': ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)),
            'Size': chunk_size,
            'ChunkSize': chunk_size,
            'ExistOnSSD': False,
            'ExistOnHDD': False,
            'Location': None,
            'SourceLocation': None
        })
        s += chunk_size
    else:
        if s > dataset_size:
            chunks.append({
                'ETag': ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)),
                'Size': dataset_size - s + chunk_size,
                'ChunkSize': dataset_size - s + chunk_size,
                'ExistOnSSD': False,
                'ExistOnHDD': False,
                'Location': None,
                'SourceLocation': None
            })

    datasources.append({
        'dataset_size': dataset_size,
        'chunks': chunks
    })

data['datasource'] = datasources

In [None]:
# 计算行之间的时间差和用户ID是否相同
data['time_difference'] = data['submit_time'].diff().dt.total_seconds().fillna(0)
data['same_user'] = data['user'].eq(data['user'].shift())

# 根据时间差和用户ID来确定merge_group
data['merge_group'] = ((data['time_difference'] > 300) | ~data['same_user']).cumsum()

# 合并属于同一组的行
merged_df = data.groupby('merge_group')

workloads = []
for gid, df in merged_df:
    df.reset_index(inplace=True, drop=True)
    start_time = df.sort_values(by='start_time').iloc[0]['start_time']
    end_time = df.sort_values(by='end_time').iloc[-1]['end_time']
    deploy = {
        'deploy_id': ''.join(random.choices(string.ascii_uppercase + string.digits, k=7)),
        'user': df.iloc[0]['user'],
        'submit_time': df['submit_time'][0].strftime("%Y-%m-%d %H:%M:%S"),
        'datasource': df['datasource'][0]
    }
    jobs = []
    
    for i, row in df.iterrows():
        job = {
            'job_id': ''.join(random.choices(string.ascii_uppercase + string.digits, k=7)),
            'cpu': int(min(row['cpu_num'] / row['gpu_num'], num_cpus[cluster])),  # CPU can't exceed the # cores on a node, because GPU can't be divided
            'gpu': 1,
            'numWorkers': row['gpu_num'],
            'storage': deploy['datasource']['dataset_size'],
            'start_time': row['start_time'],
            'end_time': row['end_time'],
            'queue': row['queue'],
            'duration': row['duration'],
            'location': []
        }
        jobs.append(job)
    deploy['jobs'] = jobs
    workloads.append(deploy)

with open(f'./cluster/{cluster}/{cluster}.json', 'w') as f:
    json.dump(workloads, f, indent=4)