#### Dependencies

In [1]:
# Pickle file dependencies
from tacc_stats.pickler.job_stats import Job
import cPickle as pickle

In [2]:
# System dependencies
from os import listdir
import time as clock
from IPython.display import clear_output

In [50]:
# Data manipulation dependencies
import pandas as pd
import numpy as np
import datetime as dt
from collections import OrderedDict 

In [4]:
# Directory of all pickled jobs via comet
# new_dir = '/oasis/projects/nsf/sys200/stats/xsede_stats/archive'
source_dir = '/oasis/projects/nsf/sys200/tcooper/xsede_stats/comet_pickles/'

# Directory to save to
save_dir = '../modules/data/raw/'

In [5]:
# List of date directories in source_dir
dates_list = [ date for date in listdir(source_dir) ]

In [6]:
len(dates_list)

152

#### Read in jobs from cleaned jobs directory

In [7]:
def prep_target( date_file ):
    jobs_list = []
    
    # open file and read the content in a list
    with open(date_file, 'r') as f:
        lines = f.readlines()
    
        for jobid in lines:
            current = jobid[:-1]
            jobs_list.append(current)
    
    return jobs_list

# Access and open pickled job files
**Process:**
    - Iterate through the non-empty date folders available in source_dir
    - A file is saved in valid_jobs if:
        * The pickled file is a Job object
    - Exceptions are skipped

In [8]:
# last compiled 3/16/20
pull_dates = dates_list[35:40]
n = float(sum([len(listdir(source_dir+date)) for date in pull_dates]))

In [9]:
job_objects = []
t0 = clock.time()
total = 0

for date in pull_dates:  
    for jobid in listdir(source_dir+date):
        total += 1
        clear_output(wait=True)
        print("Processing file {} of {} files \t ({}% of total files)".format(total, n, np.round( total/n*100, 2)))
        
        try:
            pickle_file = open( source_dir+date+'/'+jobid, 'rb')
            job_file = pickle.load(pickle_file)
            job_objects.append(job_file)
            pickle_file.close()
        except:
            next
            
        t2 = clock.time()
        print
        print("Run time: {}s".format(np.round(t2-t0, 1)))

Processing file 63614 of 63614.0 files 	 (100.0% of total files)

Run time: 379.5s


In [10]:
len(job_objects)

6062

## Prep Data Cleaning

In [11]:
def check_val( val ):
    try:
        val = float(val)
        return val
    except:
        return 0
    else:
        return 0
        
def convert_dt( val ):
    return dt.datetime.utcfromtimestamp( val ).strftime( "%Y-%m-%d %H:%M:%S" )

def get_schemas( job ):
    return { stat:schema.keys() for stat,schema in job.schemas.items() }

def get_indices( job, host ):
    indices = []
    stats = [ stat for stat in job.schemas.keys() if stat in host.stats.keys() ]
    schemas = { stat:schema.keys() for stat,schema in job.schemas.items() }
    cores = { stat:core.keys() for stat,core in host.stats.items() }
    
    for stat in stats:
        for core in cores[stat]:
            for schema in schemas[stat]:
                indices.append( (stat,core,schema) )
             
    return indices

def get_times( job, host ):
    times = [ job.start_time ]
    times.extend( host.times )
    times.append( job.end_time )
    return [ convert_dt(t) for t in times ]

def clean_list( data_list ):
    return [ check_val( x ) for x in data_list ]
    
def get_data( host, row_labels ):
    data = { label:[] for label in row_labels }
    
    for stat,node in host.stats.items():
        for core,matrix in node.items():
            matrix = matrix.T
            for i in range(len(matrix)):
                data[stat,core] = clean_list( matrix[i] )
    return data

def fill_df( template_df, data_dict):
    for row,data in data_dict.items():
        template_df.loc[row].update( pd.Series(data) )
    return df

In [47]:
def get_jobs( job_dfs, job_objects ):
    job_df_pairs = []
    wanted = job_dfs.keys()
    
    for idx in range(len(job_objects)):
        job = job_objects[idx]
        jobid = job.id
        
        if (jobid in wanted) and (len(job.times) > 8) and (len(job.hosts.keys()) == 1):
            job_df_pairs.append( (job, job_dfs[jobid]) )
        else:
            continue
            
    return job_df_pairs

In [None]:
#def find_next( current, unsorted ):
#    target = current + 00:10:00
#    found = unsorted[0]
#    proximity = target - found
#    
#    if len(unsorted) > 1:
#        for i in range(len(unsorted)):
#            if target - unsorted[i] < proximity:
#                found = unsorted[i]
#                proximity = target - found
#    return found
#
#def fill_sorted( start, unsorted ):
#    sorted_list = []
#    
#    for i in range(len(unsorted)):
#        current = sorted_list[i]
#        next_time = find_next( current, unsorted )
#        sorted_list[i+1] = next_time
#        
#def sort_times( job ):
#    start = job.start
#    mid = job.times
#    end = job.end
#    
#    if start == end:
#        return [start]
#    elif len(mid) < 1:
#        return [start, end]
#    elif len(mid) < 2:
#        return [start, mid[0], end]
#    else:
#        return fill_sorted( start, mid.append(end) )

### Loops in loops in loops (Cleaning data)
**Notes:**
    - If a value is missing from the data, it will be replaced with '0' for the purpose of this project
    - If a type of statistic was not collected on the job, that column is dropped from the DataFrame
    - Two files are created during each iteration:
         1) A .csv of the descriptive statistics for that host,job pair
         2) A full .csv of the host,job data from the formatted DataFrame
    - Naming convention: Files are labelled as '{host}_{jobid}' to support random lookup
         * A job run on multiple host nodes is processed and saved with each individual host,job pair *

In [12]:
m = float(sum([len(job.hosts.keys()) for job in job_objects]))

In [13]:
cut = len(job_objects)/2
first = 0 # 2025
stop = len(job_objects)
rem = stop - first

print "Total Jobs (this date):\t\t", len(job_objects)
print "Total Host,Job Pairs:\t\t", int(m)
print("------------------------------------")
print "Remaining jobs to scan:\t\t", int(rem)

Total Jobs (this date):		6062
Total Host,Job Pairs:		6264
------------------------------------
Remaining jobs to scan:		6062


In [14]:
job_dfs = {}
t0 = clock.time()
total = 0
current = 0

for job_idx in range( first, stop ):
    job = job_objects[ job_idx ]
    schemas = get_schemas( job )
    total += 1
    
    # support for tracking progress in below print statements
    clear_output(wait=True)
    
    # iterate through each host object job was run on
    for host_name, host in job.hosts.iteritems():
        print("Processing hosts for job {} of {} \t ({}% of total)".format(job_idx+1, stop, np.round( (current+first)/m*100, 2)))
        current += 1
        
        # build MultiIndex for df 
        idx_labels = get_indices( job, host )
        indices = pd.MultiIndex.from_tuples( idx_labels, names=['Stat', 'Device', 'Schema'] )
                    
        # process timestamps
        times = get_times( job, host )
    
        # collect job data
        data = get_data( host, idx_labels )
        
        # create df with MultiIndex, ordered times
        df = pd.DataFrame( index=indices, columns=times ).sort_index()
        
        # fill df
        for stat,devices in host.stats.items():
            for device,cycles in devices.items():
                for i in range(len(cycles)):
                    for j in range(len(cycles[i])):
                        try:
                            schema = schemas[stat][j]
                            df.loc[(stat,device,schema)] = check_val(cycles[i][j])
                        except:
                            next
        
        # save job info from DataFrame to csv file
        #df.to_csv( path_or_buf=save_dir+"{}_{}.csv".format( host_name, job.id ) )
        job_dfs[job.id] = df

Processing hosts for job 6062 of 6062 	 (99.98% of total)


In [15]:
# check that no job was missed
if total == ( stop-first ):
    print "Success!"
else:
    print stop - first - total, "jobs missing"

Success!


# Pull in CPICORE calculation

In [61]:
def check_len ( df, num ):
    return len(df.columns.values.tolist()) > num

def purge_str ( df ):
    for row,col in df.iterrows():
        for i in range(len(col.values)):
            val = col.values[i]
            time = col.index[i]
            
            # certain numeric responses are recorded as str
            if type(val) is str:
                try:
                    df.at[row,time] = np.float64( val )
                except:
                    df.at[row,time] = np.float64(0)
                else:
                    df.at[row,time] = np.float64(0)
                    
    return df

def get_id ( file_name ):
    host,jobid = file_name.split('_')
    return host[:11] , jobid[:7]

def get_dfs ( file_list, min_jobs, min_cycles=0 ):
    job_dfs = {}
    count = 0
    
    for i in range( len(file_list) ):
        
        if count < min_jobs:
            job_file = file_list[i]
            df = purge_str( pd.read_csv( source_dir+job_file, index_col=[0,1,2], low_memory=False ) )
            host,jobid = get_id( job_file )
        
            if check_len( df, min_cycles ):
                job_dfs[jobid] = df
            else:
                next
            
            count += 1
            
    return job_dfs
        
def sort_hosts ( file_list ):
    hosts = {}
    
    for job_file in file_list:
        host,jobid = job_file.split('_')
        
        if host in hosts:
            hosts[host].append( jobid[:7] )
        else:
            hosts[host] = [ jobid[:7] ]
            
    return hosts
            
def sort_jobs ( file_list, job_dfs ):
    jobs = {}
    
    for job_file in file_list:
        host,jobid = get_id( job_file )
        
        if jobid in job_dfs.keys():
            if jobid in jobs:
                jobs[jobid].append( host )
            else:
                jobs[jobid] = [ host ]
            
    return jobs, multiple_hosts(jobs)

def multiple_hosts ( jobs_dict ):
    return any( len(host) > 1 for job,host in jobs_dict.items() )

def cpicore ( job_df, monitor=False ):
    data = job_df.loc['intel_hsw']
    times = job_df.columns.tolist()
    cpicore_dict = OrderedDict( )
    
    for t in times:
        cpicore_dict[t] = 0
    
    for i in range(1, len(times)):
        chunk = data[times[:i+1]]
        devices = { row : np.mean(col.values) for row,col in chunk.iterrows() }
        avg_c = { key[0]:0 for key,val in devices.items() }
        sum_avgs = 0
        current = times[i]
        
        for key,val in avg_c.items():
            avg_c[ key ] = devices[ (key, 'CLOCKS_UNHALTED_CORE') ] / devices[ (key, 'INSTRUCTIONS_RETIRED') ]
    
        for key,val in avg_c.items():
            sum_avgs += val
            
        cpicore_dict[current] = sum_avgs/24
    
    if monitor:
        return cpicore_dict
    
    return sum_avgs/24

def cpiref ( job_df ):
    data = job_df.loc['intel_hsw'] 
    devices = { row : np.mean(col.values) for row,col in data.iterrows() }
    avg_d = { key[0]:0 for key,val in devices.items() }
    sum_avgs = 0
    
    for key,val in avg_d.items():
        avg_d[ key ] = devices[ (key, 'CLOCKS_UNHALTED_REF') ] / devices[ (key, 'INSTRUCTIONS_RETIRED') ]
        
    for key,val in avg_d.items():
        sum_avgs += val
        
    return sum_avgs/24

In [62]:
data = get_jobs( job_dfs, job_objects )
sample_set = {}
jobids = job_dfs.keys()

for job_tup in data:
    job = job_tup[0]
    job_data = job_tup[1]
    jobid = job.id
    date = convert_dt( job.times[-1] )[:10]
    host = job.hosts[ job.hosts.keys()[0] ].name
    core = np.round( cpicore( job_data ), 5 )
    ref = np.round( cpiref( job_data ), 5 )
    
    sample_set[jobid] = ( date, host, core, ref )

sample_set

{'4070662': ('2016-09-16', 'comet-14-66', 1.0002, 1.00081),
 '4244338': ('2016-09-16', 'comet-27-12', 1.00001, 1.00001),
 '4244341': ('2016-09-16', 'comet-06-55', 1.00001, 1.0),
 '4244933': ('2016-09-16', 'comet-25-01', 1.00025, 0.99964),
 '4255235': ('2016-09-16', 'comet-13-48', 1.00077, 1.00215),
 '4255874': ('2016-09-15', 'comet-06-02', 1.64812, 1.44424),
 '4256084': ('2016-09-16', 'comet-06-42', 1.9496, 1.76934),
 '4256234': ('2016-09-15', 'comet-06-08', 1.59796, 1.40492),
 '4256386': ('2016-09-17', 'comet-10-64', 1.0001, 1.00042),
 '4262035': ('2016-09-16', 'comet-20-10', 3.43046, 2.95714),
 '4262682': ('2016-09-17', 'comet-07-11', 1.00017, 1.00073),
 '4262823': ('2016-09-16', 'comet-07-05', 1.00023, 1.00018),
 '4263608': ('2016-09-16', 'comet-10-60', 1.92217, 1.51549),
 '4263622': ('2016-09-16', 'comet-20-53', 1.69925, 3.54011),
 '4264410': ('2016-09-16', 'comet-26-71', 1.00001, 1.00001),
 '4264648': ('2016-09-16', 'comet-06-38', 1.0014, 1.00121),
 '4264832': ('2016-09-16', 'come