In [1]:
import csv
import datetime
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pdb

## Constants

In [2]:
LOGDIR = '../trace-data'
DATE_FORMAT_STR = '%Y-%m-%d %H:%M:%S'
MINUTES_PER_DAY = (24 * 60)
MICROSECONDS_PER_MINUTE = (60 * 1000)

## Utility code

In [3]:
def parse_date(date_str):
    """Parses a date string and returns a datetime object if possible.
    
       Args:
           date_str: A string representing a date.
        
       Returns:
           A datetime object if the input string could be successfully
           parsed, None otherwise.
    """
    if date_str is None or date_str == '' or date_str == 'None':
        return None
    return datetime.datetime.strptime(date_str, DATE_FORMAT_STR)

def timedelta_to_minutes(timedelta):
    """Converts a datetime timedelta object to minutes.
    
       Args:
           timedelta: The timedelta to convert.
           
       Returns:
           The number of minutes captured in the timedelta.
    """
    minutes = 0.0
    minutes += timedelta.days * MINUTES_PER_DAY
    minutes += timedelta.seconds / 60.0
    minutes += timedelta.microseconds / MICROSECONDS_PER_MINUTE
    return minutes

def round_to_nearest_minute(t):
    """Rounds a datetime object down to the nearest minute.
    
       Args:
           t: A datetime object.
           
        Returns:
            A new rounded down datetime object.
    """
    return t - datetime.timedelta(seconds=t.second, microseconds=t.microsecond)

def add_minute(t):
    """Adds a single minute to a datetime object.
    
       Args:
           t: A datetime object.
           
        Returns:
            A new datetime object with an additional minute.
    """
    return t + datetime.timedelta(seconds=60)

In [4]:
def get_cdf(data):
    """Returns the CDF of the given data.
    
       Args:
           data: A list of numerical values.
           
       Returns:
           An pair of lists (x, y) for plotting the CDF.
    """
    sorted_data = sorted(data)
    p = 100. * np.arange(len(sorted_data)) / (len(sorted_data) - 1)
    return sorted_data, p

In [5]:
class Job:
    """Encapsulates a job."""
    
    def __init__(self, status, vc, jobid, attempts, submitted_time, user):
        """Records job parameters and computes key metrics.
        
           Stores the passed in arguments as well as the number of GPUs
           requested by the job. In addition, computes the queueing delay
           as defined as the delta between the submission time and the start
           time of the first attempt. Finally, computes run time as defined
           as the delta between the initial attempt's start time and the last
           attempt's finish time.
           
           NOTE: Some jobs do not have any recorded attempts, and some attempts
           have missing start and/or end times. A job's latest attempt having no
           end time indicates that the job was still running when the log data
           was collected.
   
           Args:
               status: One of 'Pass', 'Killed', 'Failed'.
               vc: The hash of the virtual cluster id the job was run in.
               jobid: The hash of the job id.
               attempts: A list of dicts, where each dict contains the following keys:
                   'start_time': The start time of the attempt.
                   'end_time': The end time of the attempt.
                   'detail': A list of nested dicts where each dict contains 
                             the following keys:
                        'ip': The server id.
                        'gpus': A list of the GPU ids allotted for this attempt.
                submitted_time: The time the job was submitted to the queue.
                user: The user's id.            
        """
        self._status = status
        self._vc = vc
        self._jobid = jobid
        for attempt in attempts:
            attempt['start_time'] = parse_date(attempt['start_time'])
            attempt['end_time'] = parse_date(attempt['end_time'])
        self._attempts = attempts
        self._submitted_time = parse_date(submitted_time)
        self._user = user
        
        if len(self._attempts) == 0:
            self._num_gpus = None
            self._run_time = None
            self._queueing_delay = None
        else:
            self._num_gpus = sum([len(detail['gpus']) for detail in self._attempts[0]['detail']])
            if self._attempts[0]['start_time'] is None:
                self._run_time = None
                self._queueing_delay = None
            else:
                if self._attempts[-1]['end_time'] is None:
                    self._run_time = None
                else:
                    self._run_time = \
                        timedelta_to_minutes(self._attempts[-1]['end_time'] -
                                             self._attempts[0]['start_time'])
                self._queueing_delay = \
                    timedelta_to_minutes(self._attempts[0]['start_time'] -
                                         self._submitted_time)
    
    @property
    def status(self):
        return self._status
    
    @property
    def vc(self):
        return self._vc
    
    @property
    def jobid(self):
        return self._jobid
    
    @property
    def attempts(self):
        return self._attempts
    
    @property
    def submitted_time(self):
        return self._submitted_time
    
    @property
    def user(self):
        return self._user
    
    @property
    def num_gpus(self):
        return self._num_gpus
    
    @property
    def queueing_delay(self):
        return self._queueing_delay
    
    @property
    def run_time(self):
        return self._run_time

In [6]:
def get_bucket_from_num_gpus(num_gpus):
    """Maps GPU count to a bucket for plotting purposes."""
    if num_gpus is None:
        return None
    if num_gpus == 1:
        return 0
    elif num_gpus >= 2 and num_gpus <= 4:
        return 1
    elif num_gpus >= 5 and num_gpus <= 8:
        return 2
    elif num_gpus > 8:
        return 3
    else:
        return None
    
def get_plot_config_from_bucket(bucket):
    """Returns plotting configuration information."""
    if bucket == 0:
        return ('1', 'green', '-')
    elif bucket == 1:
        return ('2-4', 'blue', '-.')
    elif bucket == 2:
        return ('5-8', 'red', '--')
    elif bucket == 3:
        return ('>8', 'purple', ':')

## Load the cluster log

In [7]:
cluster_job_log_path = os.path.join(LOGDIR, 'cluster_job_log')
with open(cluster_job_log_path, 'r') as f:
    cluster_job_log = json.load(f)
jobs = [Job(**job) for job in cluster_job_log]

In [8]:
vc_list = []
for job in jobs:
    if job.vc not in vc_list:
        vc_list.append(job.vc)
jobs[0].__dict__
# jobs[0].vc

{'_status': 'Pass',
 '_vc': '11cb48',
 '_jobid': 'application_1506638472019_17235',
 '_attempts': [{'start_time': datetime.datetime(2017, 10, 9, 7, 2, 5),
   'end_time': datetime.datetime(2017, 10, 9, 7, 3, 11),
   'detail': [{'ip': 'm372', 'gpus': ['gpu0']}]}],
 '_submitted_time': datetime.datetime(2017, 10, 9, 7, 1, 55),
 '_user': '066e99',
 '_num_gpus': 1,
 '_run_time': 1.1,
 '_queueing_delay': 0.16666666666666666}

In [9]:
data = pd.read_csv(f'{LOGDIR}/data.csv')
used_jobids = data['jobid'].to_list()

In [10]:
jobs[0].jobid in data['jobid'].to_list()

True

In [11]:
machine_path = os.path.join(LOGDIR, 'cluster_machine_list')
machine_df = pd.read_csv(machine_path)
# df['single GPU mem'].value_counts()
machine_df.columns

Index(['machineId', ' number of GPUs', ' single GPU mem'], dtype='object')

In [12]:
machine_df

Unnamed: 0,machineId,number of GPUs,single GPU mem
0,m0,2,12GB
1,m1,2,12GB
2,m100,2,12GB
3,m102,2,12GB
4,m105,2,12GB
...,...,...,...
547,m93,8,24GB
548,m94,8,24GB
549,m95,8,24GB
550,m96,8,24GB


In [None]:
new_df = pd.DataFrame(columns=('vc', 'jobid', 'num_attemps', 'user', 'status', 'num_gpus', 'runtime', 'queue_time', 'gpu_util_mean', 'gpu_util_min', 'gpu_util_max', 'cpu_util', 'mem_util', 'machine_gpu_mem', 'machine_gpu_num'))
row_num = 0
for job in jobs:
    if job.jobid in used_jobids:
        vc = job.vc
        jobid = job.jobid
        num_attempts = len(job.attempts)
        user = job.user
        status = job.status
        num_gpus = job.num_gpus
        runtime = job.run_time
        queue_time = job.queueing_delay
        row = data[data['jobid'] == job.jobid]
        gpu_util_mean = row['gpu_util_mean'].values[0]
        gpu_util_min = row['gpu_util_min'].values[0]
        gpu_util_max = row['gpu_util_max'].values[0]
        cpu_util = row['cpu_util'].values[0]
        mem_util = row['mem_util'].values[0]
        if len(job.attempts[-1]['detail']) == 0:
            continue
        machine_id = job.attempts[-1]['detail'][0]['ip']
        machine_gpu_mem = machine_df[machine_df['machineId'] == machine_id][' single GPU mem'].values[0]
        machine_gpu_num = machine_df[machine_df['machineId'] == machine_id][' number of GPUs'].values[0]
#         pdb.set_trace()
        new_df.loc[row_num] = [vc, jobid, num_attempts, user, status, num_gpus, runtime, queue_time, gpu_util_mean, gpu_util_min, gpu_util_max, cpu_util, mem_util, machine_gpu_mem, machine_gpu_num]
        row_num += 1

In [28]:
new_df

Unnamed: 0,vc,jobid,num_attemps,user,status,num_gpus,runtime,queue_time,gpu_util_mean,gpu_util_min,gpu_util_max,cpu_util,mem_util,machine_gpu_mem,machine_gpu_num
0,11cb48,application_1506638472019_17235,1,066e99,Pass,1,1.100000,0.166667,25.591667,0.0,51.183333,2.108333,95.686876,24GB,8
1,6214e9,application_1506638472019_17145,1,2c46d5,Pass,1,49.183333,0.833333,0.000000,0.0,0.000000,48.659673,99.553043,12GB,2
2,6c71a0,application_1506638472019_0262,1,a04bf9,Pass,1,14344.366667,0.133333,90.033048,0.0,96.000000,6.554313,98.141959,24GB,8
3,6214e9,application_1506638472019_17148,1,2c46d5,Pass,1,45.983333,3.250000,0.000000,0.0,0.000000,18.046454,79.824283,12GB,2
4,6214e9,application_1506638472019_17159,1,2c46d5,Pass,1,46.100000,0.550000,0.000000,0.0,0.000000,21.289007,85.083449,12GB,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
111606,11cb48,application_1508364558011_30316,1,066e99,Pass,1,0.950000,2.333333,0.000000,0.0,0.000000,0.562500,23.500736,24GB,8
111607,ee9e8c,application_1508364558011_30284,5,686d6e,Failed,8,67.616667,0.266667,0.000000,0.0,0.000000,0.694722,61.297971,12GB,2
111608,6c71a0,application_1508364558011_30313,1,450add,Killed,1,2.750000,1.133333,0.000000,0.0,0.000000,0.572500,22.961763,24GB,8
111609,6c71a0,application_1508364558011_30301,1,450add,Pass,1,31.600000,2.250000,46.726768,0.0,99.316667,7.037121,92.978473,24GB,8


In [29]:
new_df.to_csv(f'{LOGDIR}/preprocessed.csv')