In [1]:
import os
import csv
import json
import time
import ciso8601
import pandas as pd
from datetime import datetime

"""
Total Machine Numbers:  552
Total GPU Numbers:  2490 ×
2 GPU Machine(12GB) Numbers:  321 ×
8 GPU Machine(24GB) Numbers:  231 ×


************* Revised *****************

Total Machine Numbers:  552 - 13
Total GPU Numbers:  3205
8 GPU Machine Numbers:  264
4 GPU Machine Numbers:  271
3 GPU Machine Numbers:  1
2 GPU Machine Numbers:  3
0 GPU Machine Numbers:  13

Number of total jobs: 117325
Number of no recorded attempt jobs:  5236
Number of no recorded start jobs:  136
Number of no recorded end jobs:  6695

15 VC with "ip" nums [409, 412, 409, 388, 414, 226, 399, 386, 389, 389, 301, 85, 10, 5, 12]
"""

"""Constants"""
LOGDIR = './data'
DATE_FORMAT_STR = '%Y-%m-%d %H:%M:%S'
TOTAL_GPU_NUM = 3205
TOTAL_Machine_NUM = 552 - 13
LOG_START = '2017-08-14 23:27:00'  # '2017-08-07 10:03:01'
LOG_END = '2017-12-22 08:03:22'
START_TS = ciso8601.parse_datetime(LOG_START).timestamp()
END_TS = ciso8601.parse_datetime(LOG_END).timestamp()
NUM_Time_Interval = int((END_TS - START_TS)/60)+1

HEADERS = ['Timestamp', 'Total_GPU_Num', 'Idle_GPU_Num', 'GPUJob_Num',
           'Running_GPUJob_Num', 'Pending_GPUJob_Num', 'Running_GPU_Num', 'Pending_GPU_Num',
           'Pending_Morethan_8GPUJob_Num', 'Pending_8GPUJob_Num', 'Pending_4GPUJob_Num', 'Pending_2GPUJob_Num', 'Pending_1GPUJob_Num', 'Pending_Lessthan_8GPUJob_Num',
           'Idle_Machine_8GPU', 'Idle_Machine_5_7GPU', 'Idle_Machine_4GPU', 'Idle_Machine_3GPU', 'Idle_Machine_2GPU', 'Idle_Machine_1GPU']


"""Data Load and Preprocessing"""
cluster_job_log_path = os.path.join(LOGDIR, 'cluster_job_log')
cluster_machine_list_path = os.path.join(LOGDIR, 'revised_machine_list.csv')
with open(cluster_job_log_path, 'r') as f:
    data = json.load(f)
machine_df = pd.read_csv(cluster_machine_list_path)

machine_df['idle_gpu_num'] = machine_df['number of GPUs']
machine_df['deploy_times'] = 0
machine_df.index = machine_df['machineId']
machine_df = machine_df.drop(columns=['Unnamed: 0', 'machineId'])

# Original machine list preprocessing
# cluster_machine_list_path = os.path.join(LOGDIR, 'cluster_machine_list')
# machine_df['number of GPUs'] = machine_df[' number of GPUs']
# machine_df['idle_gpu_num'] = machine_df[' number of GPUs']
# machine_df['deploy_times'] = 0
# machine_df.index = machine_df['machineId']
# machine_df = machine_df.drop(columns=[' number of GPUs', ' single GPU mem', 'machineId'])

In [2]:
cluster_machine_list_path = os.path.join(LOGDIR, 'revised_machine_list.csv') #'cluster_machine_list'
with open(cluster_job_log_path, 'r') as f:
    data = json.load(f)
machine_df = pd.read_csv(cluster_machine_list_path)

machine_df['idle_gpu_num'] = machine_df['number of GPUs']
machine_df['deploy_times'] = 0
machine_df.index = machine_df['machineId']
machine_df = machine_df.drop(columns=['Unnamed: 0', 'machineId'])

In [3]:
class ClusterItem:
    """Time Sequence Items"""

    def __init__(self, key):
        self._timestamp = str(datetime.fromtimestamp(START_TS + key * 60))
        self._total_gpu_number = TOTAL_GPU_NUM
        self._gpujob_number = 0
        self._running_gpujob_number = 0
        self._pending_gpujob_number = 0
        self._running_gpu_number = 0
        self._pending_gpu_number = 0
        self._idle_gpu_number = 0
        self._pending_8gpujob_number = 0
        self._pending_4gpujob_number = 0
        self._pending_2gpujob_number = 0
        self._pending_1gpujob_number = 0
        self._pending_more8_gpujob_number = 0  # Request more than 8 GPU jobs
        self._pending_less8_gpujob_number = 0  # Refer to 3, 5, 6, 7 GPU jobs
        self._idle_machine_8GPU = 0
        self._idle_machine_5_7GPU = 0
        self._idle_machine_4GPU = 0
        self._idle_machine_3GPU = 0
        self._idle_machine_2GPU = 0
        self._idle_machine_1GPU = 0
        self._running_machines = []
        self._running_inside_machines = []

    def tuple(self):
        return (
            self._timestamp,
            self._total_gpu_number,
            self._total_gpu_number - self._running_gpu_number,
            self._gpujob_number,
            self._running_gpujob_number,
            self._pending_gpujob_number,
            self._running_gpu_number,
            self._pending_gpu_number,
            self._pending_more8_gpujob_number,
            self._pending_8gpujob_number,
            self._pending_4gpujob_number,
            self._pending_2gpujob_number,
            self._pending_1gpujob_number,
            self._pending_less8_gpujob_number,
            self._idle_machine_8GPU,
            self._idle_machine_5_7GPU,
            self._idle_machine_4GPU,
            self._idle_machine_3GPU,
            self._idle_machine_2GPU,
            self._idle_machine_1GPU,
        )

In [4]:
"""Parse Data into Job List"""
num_submit_jobs = len(data)
num_no_attempt_jobs = 0
num_no_start_jobs = 0
num_no_end_jobs = 0
joblist, vc = [], {}
for job in data:
    if len(job['attempts']) == 0:
        num_no_attempt_jobs += 1
        continue
    for amp in range(len(job['attempts'])):
        j = []
        if job['attempts'][amp]['start_time'] == 'None':
            num_no_start_jobs += 1
            continue
        if job['attempts'][amp]['end_time'] == 'None':  # !!!!!!!!!!!!!!!!
            num_no_end_jobs += 1
            continue
        if amp > 0 and job['attempts'][amp-1]['end_time'] == 'None':
            continue
        j.append(job['jobid']+'-attempt'+str(amp))
        j.append(job['vc'])
        if not job['vc'] in vc:
            vc[job['vc']] = set()
        j.append(job['user'])
        j.append(job['status'])  # !!!!!!!!!!!!!!!
        if amp == 0:
            j.append(job['submitted_time'])
            j.append(job['attempts'][amp]['start_time'])
            j.append(job['attempts'][amp]['end_time'])
        else:
            j.append(job['attempts'][amp-1]['end_time'])
            j.append(job['attempts'][amp]['start_time'])
            j.append(job['attempts'][amp]['end_time'])
        node_list, gpu_num = [], []
        for g in job['attempts'][amp]['detail']:
            machine_df.at[g['ip'], 'deploy_times'] += 1
            gpu_num.append(len(g['gpus']))
            node_list.append(g['ip'])
            vc[job['vc']].add(g['ip'])
        j.append(sum(gpu_num))
        j.append(gpu_num)
        j.append(node_list)
        joblist.append(j)

# print('Number of no recorded attempt jobs: ', num_no_attempt_jobs)
# print('Number of no recorded start jobs: ', num_no_start_jobs)
# print('Number of no recorded end jobs: ', num_no_end_jobs)

In [5]:
"""Create Time Sequence List"""
cluster_data = []
for i in range(NUM_Time_Interval):
    cluster_data.append(ClusterItem(i))

ss = time.perf_counter()

"""From Job Log to Time Sequence"""
for job in joblist:
    submit = int((ciso8601.parse_datetime(job[4]).timestamp() - START_TS)/60)
    start = int((ciso8601.parse_datetime(job[5]).timestamp() - START_TS)/60)
    end = int((ciso8601.parse_datetime(job[6]).timestamp() - START_TS)/60)
    for t in range(submit, start+1):
        cluster_data[t]._gpujob_number += 1
        cluster_data[t]._pending_gpujob_number += 1
        cluster_data[t]._pending_gpu_number += int(job[7])
        if int(job[7]) == 1:
            cluster_data[t]._pending_1gpujob_number += 1
        elif int(job[7]) == 2:
            cluster_data[t]._pending_2gpujob_number += 1
        elif int(job[7]) == 4:
            cluster_data[t]._pending_4gpujob_number += 1
        elif int(job[7]) == 8:
            cluster_data[t]._pending_8gpujob_number += 1
        elif int(job[7]) > 8:
            cluster_data[t]._pending_more8_gpujob_number += 1
        else:
            cluster_data[t]._pending_less8_gpujob_number += 1

    for t in range(start, end+1):
        cluster_data[t]._gpujob_number += 1
        cluster_data[t]._running_gpujob_number += 1
        cluster_data[t]._running_gpu_number += int(job[7])
        cluster_data[t]._running_machines
        cluster_data[t]._running_machines.append(job[-1])
        cluster_data[t]._running_inside_machines.append(job[-2])

ee = time.perf_counter()
print('Time Usage:', round(ee-ss, 2))

Time Usage: 40.13


In [6]:
ss = time.perf_counter()

for t in range(NUM_Time_Interval):#NUM_Time_Interval
    
    assert len(cluster_data[t]._running_machines) == len(cluster_data[t]._running_inside_machines)
    
    machines = machine_df.copy()
    for k in range(len(cluster_data[t]._running_machines)):
        for i in range(len(cluster_data[t]._running_machines[k])):
            machines.at[cluster_data[t]._running_machines[k][i], 'idle_gpu_num'] -= cluster_data[t]._running_inside_machines[k][i]
    machine_state = machines['idle_gpu_num'].value_counts()
    for i in range(9):
        if i not in machine_state.index:
            machine_state[i] = 0
    cluster_data[t]._idle_machine_8GPU = machine_state[8]
    cluster_data[t]._idle_machine_4GPU = machine_state[4]
    cluster_data[t]._idle_machine_2GPU = machine_state[2]
    cluster_data[t]._idle_machine_1GPU = machine_state[1]
    cluster_data[t]._idle_machine_3GPU = machine_state[3]
    cluster_data[t]._idle_machine_5_7GPU = machine_state[5] + machine_state[6] + machine_state[7]      

ee = time.perf_counter()
print('Time Usage:', round(ee-ss, 2))
    
with open('./timeseq.csv', 'w') as cluster:
    writer = csv.writer(cluster)
    writer.writerow(HEADERS)
    for i in cluster_data:
        writer.writerow(i.tuple())

Time Usage: 693.09


In [7]:
"""Job List Sample"""

TEST_INDEX = 67863
capacity = []
for k in range(len(cluster_data[TEST_INDEX]._running_machines)):
    ls = []
    for i in range(len(cluster_data[TEST_INDEX]._running_machines[k])):
        ls.append(machine_df.at[cluster_data[TEST_INDEX]._running_machines[k][i], 'number of GPUs'])
    capacity.append(ls)

d = {'machine':cluster_data[TEST_INDEX]._running_machines, 'capacity': capacity, 'gpu_deploy':cluster_data[TEST_INDEX]._running_inside_machines}
df = pd.DataFrame(data=d)
df[20:50]

Unnamed: 0,machine,capacity,gpu_deploy
20,[m404],[8],[8]
21,[m178],[4],[1]
22,[m356],[8],[4]
23,[m171],[8],[4]
24,"[m168, m190]","[8, 8]","[4, 4]"
25,[m255],[4],[1]
26,[m146],[8],[2]
27,[m297],[8],[8]
28,[m74],[4],[1]
29,[m75],[4],[1]
