<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

BASE_RAW_DATA_DIR = '../data/raw'

GPU_CSV_FILE = BASE_RAW_DATA_DIR + '/gpu.csv'
"""
str: gpu.csv file location 
"""

CHECK_CSV_FILE = BASE_RAW_DATA_DIR +  '/application-checkpoints.csv'
"""
str: application-checkpoints.csv filename file location 
"""

TASK_CSV_FILE = BASE_RAW_DATA_DIR + '/task-x-y.csv'
"""
str: task-x-y.csv file location 
"""

TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
"""
str: string used to format timestamp for datetime conversion
"""

def timestamp_conv(df):
    """ Converts a timestamp to datetime
    ----------
    df
        dataframe to convert to datetime
    -------
    float
         converted timestamp
    """
    df = df.apply(lambda x: datetime.strptime(x, TIMESTAMP_FORMAT))
    return(df)

def clean_gpu(gpu_df):
    """Clean gpu dataframe by dropping uneeded serial number and
    fixes timestamp format to datetime

    Parameters
    ----------
    gpu_df
        gpu dataframe to clean

    Returns
    -------
    pandas.core.frame.DataFrame
        Cleaned GPU dataframe

    """

    # Drop uneeded serial column

    gpu_df.drop(columns='gpuSerial', inplace=True)
    gpu_df['timestamp'] = timestamp_conv(gpu_df['timestamp'])
    
    return(gpu_df)

def merge_check_task(checkpoints_df, tasks_df):
    """merge (left join) checkpoints with task df through job and task id

    Parameters
    ----------
    checkpoints_df
        application checkpoints dataframe to merge

    tasks_df
        tasks dataframe to merge

    Returns
    -------
    pandas.core.frame.DataFrame
        Cleaned GPU dataframe

    """

    # Use left join on taskId and jobId

    check_task_df = checkpoints_df.merge(tasks_df,
                                     on=['taskId', 'jobId'], how='left')
    return (check_task_df)

def clean_check_task(check_task_df):
    """Removes uneeded ids for merged application checkpoints and tasks df 
    and fixes timestamp format to datetime

    Parameters
    ----------
    check_task_df
         merged application checkpoints and tasks df to clean

    Returns
    -------
    pandas.core.frame.DataFrame
        Cleaned GPU dataframe

    """

    # Drop uneeded ids

    check_task_df.drop(columns= ['jobId', 'taskId'], inplace=True)
    check_task_df['timestamp'] = timestamp_conv(check_task_df['timestamp'])

    return(check_task_df)

gpu_df = pd.read_csv(GPU_CSV_FILE)
checkpoints_df = pd.read_csv(CHECK_CSV_FILE)
tasks_df = pd.read_csv(TASK_CSV_FILE)

gpu_df = clean_gpu(gpu_df)
check_task_df = merge_check_task(checkpoints_df, tasks_df)
check_task_df = clean_check_task(check_task_df)

In [2]:
check_task_df_start = check_task_df[check_task_df['eventType'] == 'START'].copy()
check_task_df_stop = check_task_df[check_task_df['eventType'] == 'STOP'].copy()

check_task_df_start.rename(index=str, columns={"timestamp": "start_time"}, inplace = True)
check_task_df_stop.rename(index=str, columns={"timestamp": "stop_time"}, inplace = True)

check_task_df_stop.drop('eventType', axis = 1, inplace = True)
check_task_df_start.drop('eventType', axis = 1, inplace = True)

In [3]:
check_task_flat_df = pd.merge(
    check_task_df_start, check_task_df_stop,
    on=['hostname', 'eventName', 'x', 'y', 'level'])

In [4]:
check_task_flat_df.head()

Unnamed: 0,start_time,hostname,eventName,x,y,level,stop_time
0,2018-11-08 07:42:29.842,0d56a730076643d585f77e00d2d8521a00000N,Saving Config,238,4,12,2018-11-08 07:42:29.845
1,2018-11-08 07:42:29.845,0d56a730076643d585f77e00d2d8521a00000N,Render,238,4,12,2018-11-08 07:43:10.965
2,2018-11-08 07:43:56.239,0d56a730076643d585f77e00d2d8521a00000N,Uploading,238,5,12,2018-11-08 07:43:57.245
3,2018-11-08 07:44:47.555,0d56a730076643d585f77e00d2d8521a00000N,Saving Config,151,75,12,2018-11-08 07:44:47.557
4,2018-11-08 07:47:38.457,0d56a730076643d585f77e00d2d8521a00000N,Uploading,102,109,12,2018-11-08 07:47:39.357


In [5]:
df_A = pd.DataFrame({'start_date':['2017-03-27','2017-01-10'],'end_date':['2017-04-20','2017-02-01']})
df_B = pd.DataFrame({'event_date':['2017-01-20','2017-01-27'],'price':[100,200]})

df_A['end_date'] = pd.to_datetime(df_A.end_date)
df_A['start_date'] = pd.to_datetime(df_A.start_date)
df_B['event_date'] = pd.to_datetime(df_B.event_date)

df_A = df_A.assign(key=1)
df_B = df_B.assign(key=1)
df_merge = pd.merge(df_A, df_B, on='key').drop('key',axis=1)

df_merge = df_merge.query('event_date >= start_date and event_date <= end_date')

df_out = df_A.merge(df_merge, on=['start_date','end_date'], how='left').fillna('').drop('key', axis=1)

print(df_out)

  start_date   end_date           event_date price
0 2017-03-27 2017-04-20                           
1 2017-01-10 2017-02-01  2017-01-20 00:00:00   100
2 2017-01-10 2017-02-01  2017-01-27 00:00:00   200


In [5]:
check_task_flat_df_r = check_task_flat_df[
    (check_task_flat_df['start_time'] >= gpu_df['timestamp'][0]) 
    & (check_task_flat_df['stop_time'] <= gpu_df['timestamp'][len(gpu_df)-1])].copy()

len(check_task_flat_df_r)

17252

In [6]:
check_task_flat_df_r.index = pd.IntervalIndex.from_arrays(
    check_task_flat_df_r['start_time'],
    check_task_flat_df_r['stop_time'], 
    closed='both')

gpu_df['eventName'] = gpu_df['timestamp'].apply(
    lambda x : check_task_flat_df_r.iloc[
        check_task_flat_df_r.index.get_loc(x)]['eventName'])

KeyError: ('datetime64[ns]', 'both')

In [22]:
import pandasql as ps
import sqlite3

# connection to sql
conn = sqlite3.connect(':memory:')

# move dataframes to sql
check_task_flat_df_r.to_sql('CheckTask', conn, index=False)
gpu_df.to_sql('Gpu', conn, index=False)

# SQL query
query = '''
SELECT *
FROM Gpu
LEFT JOIN CheckTask ON gpu.hostname = CheckTask.hostname
    AND (gpu.timestamp >= CheckTask.start_time AND gpu.timestamp <= CheckTask.stop_time)
'''

newdf = pd.read_sql_query(query, conn)

In [26]:
len(newdf.dropna().head())

5

In [12]:
newdf.head()

Unnamed: 0,timestamp,hostname,gpuUUID,powerDrawWatt,gpuTempC,gpuUtilPerc,gpuMemUtilPerc,start_time,hostname.1,eventName,x,y,level,stop_time
0,2018-11-08 08:27:11.089000,0d56a730076643d585f77e00d2d8521a00000Q,GPU-d84a1024-9381-c725-3b85-dd7143e64c35,27.18,35,0,0,2018-11-08 08:27:10.606000,0d56a730076643d585f77e00d2d8521a00000Q,TotalRender,156,186,12,2018-11-08 08:27:54.895000
1,2018-11-08 08:27:11.089000,0d56a730076643d585f77e00d2d8521a00000Q,GPU-d84a1024-9381-c725-3b85-dd7143e64c35,27.18,35,0,0,2018-11-08 08:27:10.608000,0d56a730076643d585f77e00d2d8521a00000Q,Render,156,186,12,2018-11-08 08:27:53.796000
2,2018-11-08 08:27:10.949000,83ea61ac1ef54f27a3bf7bd0f41ecaa700000J,GPU-8792a29c-529e-1837-1806-c669cd9b1960,42.44,41,0,0,2018-11-08 08:27:10.839000,83ea61ac1ef54f27a3bf7bd0f41ecaa700000J,Uploading,200,23,12,2018-11-08 08:27:11.893000
3,2018-11-08 08:27:10.949000,83ea61ac1ef54f27a3bf7bd0f41ecaa700000J,GPU-8792a29c-529e-1837-1806-c669cd9b1960,42.44,41,0,0,2018-11-08 08:27:10.846000,83ea61ac1ef54f27a3bf7bd0f41ecaa700000J,Tiling,200,23,12,2018-11-08 08:27:11.882000
4,2018-11-08 08:27:10.760000,b9a1fa7ae2f74eb68f25f607980f97d700000H,GPU-38bbf3b6-80fb-7e6f-6678-ee45035507ab,42.21,37,0,0,2018-11-08 08:27:10.612000,b9a1fa7ae2f74eb68f25f607980f97d700000H,TotalRender,160,14,12,2018-11-08 08:27:56.265000


In [None]:
check_task_flat_df_r
gpu_df

df_merge = pd.merge(check_task_flat_df_r, gpu_df, on='hostname')

df_merge = df_merge.query('timestamp >= start_time and timestamp <= stop_time')

df_out = df_A.merge(df_merge, on=['start_date','end_date'], how='left').dropna()

df_out.head()

In [None]:
len(timestamp_df)

In [None]:
# set keys as indexes for join 
    
#gpu_df.set_index('timestamp', inplace=True)
#check_task_df.set_index('timestamp', inplace=True)
    
# sort by index

#gpu_df.sort_index(inplace=True)
#check_task_df.sort_index(inplace=True)

gpu_df.sort_values(by=['timestamp'], inplace=True)
check_task_df.sort_values(by=['timestamp'], inplace=True)

# Make timestamp df

timestamp_df = check_task_df.copy()
timestamp_df.drop(['hostname', 'eventName', 'eventType', 'x', 'y', 'level'], axis=1, inplace= True)

timestamp_df = pd.merge_asof(timestamp_df,gpu_df,
                       left_on = ['timestamp'], right_on = ['timestamp'],
                       tolerance = pd.Timedelta('4ms'), direction = 'nearest')

In [None]:
len(timestamp_df)

In [None]:
timestamp_df.head()

In [None]:
timestamp_df.isnull().sum()

In [None]:
timestamp_df.dropna(inplace = True)

In [None]:
timestamp_df.isnull().sum()

In [None]:
len(timestamp_df)

In [None]:
check_task_gpu_df = pd.merge(timestamp_df, check_task_df, on = ['hostname', 'timestamp'])

In [None]:
check_task_gpu_df.isnull().sum()

In [None]:
len(check_task_gpu_df)

In [None]:
timestamp_df.head()

In [None]:
check_task_df.head()

In [None]:
check_task_gpu_df.head()

In [None]:
check_task_gpu_df.isnull().sum()

In [None]:
len(check_task_gpu_df)