In [13]:
import logging
import pandas as pd
from pathlib import Path
from datetime import datetime
import sqlite3
import os
import matplotlib.pyplot as plt

In [14]:
data_raw_dir= os.path.join('.', 'raw')
data_processed_dir = os.path.join('.', 'processed')

In [15]:
gpu_csv = data_raw_dir + '/gpu.csv'
check_csv = data_raw_dir +  '/application-checkpoints.csv'
task_csv = data_raw_dir + '/task-x-y.csv'
processed_csv = data_processed_dir + '/processed.csv'
timestamp_form = '%Y-%m-%dT%H:%M:%S.%fZ'

In [16]:
def processed_gpu(gpu_data):
    gpu_data.drop(columns='gpuSerial', inplace=True)
    gpu_data['timestamp'] = processed_timestamp(gpu_data['timestamp'])
    
    return(gpu_data)

In [17]:
def processed_timestamp(df):
     df = df.apply(lambda x: (datetime.strptime(x, timestamp_form)))    
     return(df)

In [18]:
def processed_checktask(checktask_data):
    checktask_data.drop(columns= ['jobId', 'taskId'], inplace=True)
    
    checktask_data['timestamp'] = processed_timestamp(checktask_data['timestamp'])

    return(checktask_data)

In [19]:
def checktask_gpu_merge(gpu_data, checktask_data):
# Gets the start and stop times of the event and deletes the old timestamp
    checktask_data_starttime = checktask_data[
    checktask_data['eventType'] == 'START']
    checktask_data_stoptime = checktask_data[
    checktask_data['eventType'] == 'STOP']
    checktask_data_starttime.rename(
            index=str, columns={"timestamp": "start_time"}, inplace = True)
    checktask_data_stoptime.rename(
            index=str, columns={"timestamp": "stop_time"}, inplace = True)
    checktask_data_stoptime.drop('eventType', axis = 1, inplace = True)
    checktask_data_starttime.drop('eventType', axis = 1, inplace = True)
# Combine the start and stop of each field record
    checktask_data = pd.merge( checktask_data_starttime, checktask_data_stoptime, 
                on=['hostname', 'eventName', 'x', 'y', 'level'])
# Remove any timestamps that appear in the gpu data
    checktask_data = checktask_data[
            (checktask_data['start_time'] >= gpu_data['timestamp'][0]) &
            (checktask_data['stop_time']
            <= gpu_data['timestamp'][len(gpu_data)-1])]
# Using sqllite only combines the time between the timestamp of the gpu
    # connect to sql
    con2sql = sqlite3.connect(':memory:')
    # Move the data into the SQL
    checktask_data.to_sql('CheckTask', con2sql, index=False)
    gpu_data.to_sql('Gpu', con2sql, index=False)
    query = '''
    SELECT *
    FROM Gpu
    LEFT JOIN CheckTask ON gpu.hostname = CheckTask.hostname
    WHERE gpu.timestamp >= CheckTask.start_time 
        AND gpu.timestamp <= CheckTask.stop_time
    '''
    # create merged data
    data_merged = pd.read_sql_query(query, con2sql)
    
    # drop duplicate hostname row (index 8)
    data_merged = data_merged.loc[:,~data_merged.columns.duplicated()]
    
    # group for averages (average stats for every task)
    
    functions = {
        'powerDrawWatt': 'mean', 'gpuTempC': 'mean',
        'gpuUtilPerc': 'mean', 'gpuMemUtilPerc': 'mean',
        'start_time': 'first', 'stop_time': 'first', 
        'gpuUUID' : 'first'}
    
    data_merged = data_merged.groupby(
        ['hostname', 'eventName', 'x', 'y', 'level'],
        as_index=False, sort=False
    ).agg(functions)

    return(data_merged)

In [20]:
def merge_check_task(checkpoints_data, tasks_data):
    checktask_data = checkpoints_data.merge(tasks_data,
                                     on=['taskId', 'jobId'], how='left')
    return (checktask_data)

In [26]:
def final():
    """ Runs data processing scripts to turn raw data from (../raw) into
        cleaned data ready to be analyzed (saved in ../processed).
    """
    logger = logging.getLogger(__name__)
    logger.info('making final data set from raw data')
    
    # Read datasets in
    
    gpu_data = pd.read_csv(gpu_csv)
    checkpoints_data = pd.read_csv(check_csv)
    tasks_data = pd.read_csv(task_csv)
    
    # Cleaning and merging process    
    gpu_data = processed_gpu(gpu_data)
    checktask_data = merge_check_task(checkpoints_data, tasks_data)
    checktask_data = processed_checktask(checktask_data)  
    checktask_gpu_data = checktask_gpu_merge(gpu_data, checktask_data)

    # save final dataset
    
    checktask_gpu_data.to_csv(processed_csv)

if __name__ == '__main__':
    log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    logging.basicConfig(level=logging.INFO, format=log_fmt)
    final()

2020-01-22 22:54:16,734 - __main__ - INFO - making final data set from raw data
