## Merging of Per-Core Logs
### Can we assume the following about per-core behavior: 
#### If all cores of a running experiment do similar work, then their individual behaviors - as exposed by their per-core logs - are similar to each other, and hence, the behavior of all cores can be merged into an overall behavior of the full experimental run.

In [1]:
import os
import time
import pandas as pd
import numpy as np

In [2]:
import eigen_analysis

cols = eigen_analysis.LINUX_COLS
time_unit = eigen_analysis.TIME_CONVERSION_khz
joules_unit = eigen_analysis.JOULE_CONVERSION

In [3]:
# get all itrs explored for some (dvfs, qps) pair
def list_itrs(rdtsc_dirname):
    itrs = []
    for file in os.listdir(rdtsc_dirname):
        tags = file.split('_')
        itr = tags[1]
        itrs.append(itr)
    itrs = list(set(itrs))
    return itrs

In [4]:
def get_rdtsc_from_summary(summary_file, sys, run, itr, dvfs, rapl, qps):
    df = pd.read_csv(summary_file, sep=' ')
    df = df[(df['sys'] == sys) & (df['i'] == run) & (df['itr'] == itr) & (df['dvfs'] == dvfs) \
            & (df['rapl'] == rapl) & (df['QPS_reported'] == qps)]
    START = df['RDTSC_START']
    if START.shape[0] == 0:
        START = 0
        END = 0
    else:
        START = START.values[0]
        END = df['RDTSC_END'].values[0]
    return START, END

In [5]:
def print_err_log(dvfs, qps, itr, err_dir, rapl='135', run='0'):
    print()
    err_filename = 'err_log_' + run + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'
    if os.path.exists(err_dir + err_filename):
        err_file = open(err_dir + err_filename, 'r')
        print(err_file.read())
    else:
        print('EMPTY ERROR LOG')

In [6]:
def handle_neg_diffs(df_diffs, df, core, err_file):
    
    tmp = df_diffs.copy()

    # isolating rows with negative diffs
    tmp_neg = tmp[(tmp['joules_diff'] < 0) | (tmp['instructions_diff'] < 0) | (tmp['cycles_diff'] < 0) \
                   | (tmp['ref_cycles_diff'] < 0) | (tmp['llc_miss_diff'] < 0) | (tmp['timestamp_diff'] < 0)]

    # re-computing diffs if possible; else dropping rows
    for i,j in tmp_neg.iterrows():
        prev = df.shift(1).loc[i]
        cur = df.loc[i]
        # TODO note that we are only handling case of RAPL-energy-status register overflow
        if (tmp.loc[i]['joules_diff'] < 0) & (tmp.loc[i]['timestamp_diff'] >= 0.001):
            err_file.write('CORE ' + str(core) + '  ---  JOULES COUNTER OVERFLOW AT LOG ENTRY DIFF #' + str(i) + '\n')
            tmp.loc[i, ['joules_diff']] = (2**32 - 1) * joules_unit - prev['joules'] + cur['joules'] 
        else:
            err_file.write('CORE ' + str(core) + '  ---  UNEXPLAINED NEGATIVE VALS AT LOG ENTRY DIFF # ' + str(i) + '\n')
            cols = ''
            prevs = ''
            currs = ''
            for col in list(df.columns):
                cols += col + '  '
                prevs += str(prev[col]) + '  '
                currs += str(cur[col]) + '  '
            err_file.write('          ' + cols + '\n')
            err_file.write('         log[' + str(i-1) + ']: ' + prevs + '\n')
            err_file.write('         log[' + str(i) + ']: ' + currs + '\n')
            tmp = tmp.drop(i, axis=0)

    return tmp

In [7]:
def save_merged_logs(merged_counters_df, merged_non_counters_df, dvfs, qps, itr, app, rapl='135', run='0'):
    app_dir = app + '_' + run + '_rapl_' + rapl + '/'
    merged_dir = app_dir + qps + '_merged/'
    counters_df_outdir = merged_dir + qps + '_' + dvfs + '_counters_merged/'
    non_counters_df_outdir = merged_dir + qps + '_' + dvfs + '_non_counters_merged/'
    !mkdir -p $counters_df_outdir
    !mkdir -p $non_counters_df_outdir
    counters_outfile = counters_df_outdir + dvfs + '_' + qps + '_' + itr + '_counters_merged'
    non_counters_outfile = non_counters_df_outdir + dvfs + '_' + qps + '_' + itr + '_non_counters_merged'
    merged_counters_df.to_csv(counters_outfile)
    merged_non_counters_df.to_csv(non_counters_outfile)
    !gzip -v9 $counters_outfile
    !gzip -v9 $non_counters_outfile

    print(f'COUNTERS DIR: {counters_df_outdir}')
    !ls $counters_df_outdir
    print(f'NON_COUNTERS DIR: {non_counters_df_outdir}')
    !ls $non_counters_df_outdir

In [8]:
def concat_merge_core_logs(dvfs, qps, itr, app, rapl='135', run='0'):
    print(f'Concatenating and merging per-core logs (DVFS={dvfs} ITR-DELAY={itr}, QPS={qps})')
    app_dir = app + '_' + run + '_rapl_' + rapl + '/'
    logs_dir = app_dir + qps + '_qps/linux_' + app + '_dmesg_' + run + '_' + dvfs + '_' + rapl + '_' + qps + '/'
    err_dir = app_dir + 'err_logs/'
    err_filename = 'err_log_' + run + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'
    err_file = open(err_dir + err_filename, 'w')

    counters_full_df = pd.DataFrame()
    non_counters_full_df = pd.DataFrame()    
    df_merged_counters = pd.DataFrame()
    df_merged_non_counters = pd.DataFrame()

    # fetching experiment time syncing endpoints
    rdtsc_dir = app_dir + qps + '_qps/linux_' + app +'_rdtsc_' + run + '_' + dvfs + '_' + rapl + '_' + qps +'/'
    rdtsc_file = rdtsc_dir + 'linux.' + app + '.rdtsc.' + run + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'
    start, end = eigen_analysis.get_rdtsc(rdtsc_file)
    if ((start == 0) or (end == 0)):
        if app == 'mcdsilo':
            start, end = get_rdtsc_from_summary(app_dir + 'mcdsilo_combined.csv', 'linux', run, itr, dvfs, rapl, '50k')
        if app == 'mcd':
            # TODO
            print('ALERT: MEMCACHED LOGS MISSING RDTSC DATA!')
            print()
    # abort if rdtsc endpoints are not found anywhere
    if ((start == 0) or (end == 0)):
        print('ALERT: Unable to sync log with START and END rdtsc timestamps...')
        print('-------------------------------------------------- ABORTING -------------------------')
        print()
        abort = True
        return df_merged_counters, df_merged_non_counters, abort
    print(f'START_RDTSC = {start}, END_RDTSC = {end}')
    abort = False

    # concatenating then merging per-core logs
    cores = []
    cores = !ls $logs_dir | cut -d '_' -f2 | sort | uniq
    for c in cores:
        df_merged_counters = pd.DataFrame(columns=['instructions_diff', 'cycles_diff', 'ref_cycles_diff', \
                                     'llc_miss_diff', 'joules_diff', 'timestamp'])
        df_merged_non_counters = pd.DataFrame(columns=['rx_bytes', 'rx_desc', 'tx_bytes', 'tx_desc', 'timestamp'])

        log_file = logs_dir + 'linux.' + app + '.dmesg.' + run + '_' + str(c) + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'
        !gunzip -v $log_file'.gz'        
        df = pd.read_csv(log_file, sep = ' ', names = cols, index_col='i')
        !gzip -v9 $log_file
        df = df[(df['timestamp'] >= start) & (df['timestamp'] <= end)]
        df.loc[:, 'timestamp'] = df['timestamp'] * time_unit
        df.loc[:, 'joules'] = df['joules'] * joules_unit

        # separating per-core log data into counter and non-counter data
        # each occurring at different timescales (millisecond vs microsecond scale)
        counters_df = df[['joules', 'instructions', 'cycles', 'ref_cycles', 'llc_miss', 'timestamp']].copy()
        non_counters_df = df[['rx_bytes', 'rx_desc', 'tx_bytes', 'tx_desc', 'timestamp']].copy()                                                          

        # computing counter data diffs 
        counters_df = counters_df[(counters_df['joules'] > 0) & (counters_df['instructions'] > 0) \
                                            & (counters_df['cycles'] > 0) & (counters_df['ref_cycles'] > 0) \
                                            & (counters_df['llc_miss'] > 0)]
        timestamps = counters_df['timestamp'] # maintaining timestamps for merging across timestamp col     
        df_diffs = counters_df.diff().dropna().copy()
        df_diffs.columns = [f'{c}_diff' for c in df_diffs.columns]
        df_diffs = handle_neg_diffs(df_diffs, counters_df, c, err_file)
        df_diffs = df_diffs.drop(['timestamp_diff'], axis=1)
        df_diffs['timestamp'] = timestamps    
        
        # merging counter data
        if counters_full_df.shape[0] == 0:
            counters_full_df = df_diffs.copy()
        else:
            counters_full_df = counters_full_df.merge(df_diffs, \
                                                      left_on = 'timestamp', \
                                                      right_on = 'timestamp', \
                                                      how='outer', \
                                                      sort=True, \
                                                      suffixes=('', '_0')).fillna(0) 
            for col in df_merged_counters.columns:
                if col == 'timestamp':
                    break
                else:
                    df_merged_counters[col] = (counters_full_df[[col, col+'_0']].sum(axis=1))    
            df_merged_counters['timestamp'] = counters_full_df['timestamp']
            counters_full_df = df_merged_counters.copy()
            
        # merging non-counter data
        if non_counters_full_df.shape[0] == 0:
            non_counters_full_df = non_counters_df.copy()
        else:
            non_counters_full_df = non_counters_full_df.merge(non_counters_df, \
                                                              left_on = 'timestamp', \
                                                              right_on = 'timestamp', \
                                                              how='outer', \
                                                              sort=True, \
                                                              suffixes=('', '_0')).fillna(0) 
            for col in df_merged_non_counters.columns:
                if col == 'timestamp':
                    break
                else:
                    df_merged_non_counters[col] = (non_counters_full_df[[col, col+'_0']].sum(axis=1))
            df_merged_non_counters['timestamp'] = non_counters_full_df['timestamp']
            non_counters_full_df = df_merged_non_counters.copy()
                
        print()
        print('CORE: ', str(c))
        print('         MERGED COUNTERS: ', df_merged_counters.shape)
        print('         MERGED NON COUNTERS: ', df_merged_non_counters.shape)
        
    err_file.close()
    if (os.path.getsize(err_dir + err_filename) == 0):
        os.remove(err_dir + err_filename)
        
    print()
    print('-------------------------------------------------- PARSED 16 LOGS -------------------------')
    print()
    return df_merged_counters, df_merged_non_counters, abort    

### Merging per-core logs..

In [9]:
app = 'mcd'
run = '0'
rapl = '135'
#dvfs = '0x1300'
qps = '600k'

In [10]:
app_dir = app + '_' + run + '_rapl_' + rapl + '/'

In [11]:
print(app_dir)
#print(logs_dir)
#print(rdtsc_dir)
#print(f'cores: {cores}')
#print(f'itrs: {itrs}')

mcd_0_rapl_135/


In [12]:
skip_log = False
#for dvfs in ['0xf00', '0x1100']:
#for dvfs in ['0x1500', '0x1700']:
for dvfs in ['0x1d00']:
#, '0x1b00', '0x1d00']:
#for dvfs in [dvfs]:
    
    for qps in [qps]:
        logs_dir = app_dir + qps + '_qps/linux_' + app + '_dmesg_' + run + '_' + dvfs + '_' + rapl + '_' + qps + '/'
        rdtsc_dir = app_dir + qps + '_qps/linux_' + app + '_rdtsc_' + run + '_' + dvfs + '_' + rapl + '_' + qps +'/'
        
        itrs = list_itrs(rdtsc_dir)
        cores = []
        cores = !ls $logs_dir | cut -d '_' -f2 | sort | uniq
    
        qps_dir = app_dir + qps + '_qps/'
        merged_dir = app_dir + qps + '_merged/'
        counters_df_outdir = merged_dir + qps + '_' + dvfs + '_counters_merged/'
        non_counters_df_outdir = merged_dir + qps + '_' + dvfs + '_non_counters_merged/'
        
        for itr in itrs:
            err_dir = app_dir + 'err_logs/'
            err_filename = 'err_log_' + run + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'
            err_file = open(err_dir + err_filename, 'w')
            
            skip_log = False
            # check if merged log has been precomputed
            counters_outfile = counters_df_outdir + dvfs + '_' + qps + '_' + itr + '_counters_merged'
            non_counters_outfile = non_counters_df_outdir + dvfs + '_' + qps + '_' + itr + '_non_counters_merged'
            if (os.path.exists(counters_outfile) or os.path.exists(counters_outfile + '.gz')):
                if (os.path.exists(non_counters_outfile) or os.path.exists(non_counters_outfile + '.gz')):
                    print()
                    print(f'merged log exists for (DVFS={dvfs} ITR-DELAY={itr}, QPS={qps})..')
                    print('-------------------------------------------------------------- PRE-PARSED 16 LOGS ---------------------')
                    print()
                    skip_log = True
            
            if (skip_log == True):
                continue
                
            merged_counters_df, merged_non_counters_df, abort = concat_merge_core_logs(dvfs, qps, itr, app, run='0')
            if (abort == True):
                continue
            
            merged_counters_df['timestamp'] = merged_counters_df['timestamp'] - merged_counters_df['timestamp'].min()
            merged_non_counters_df['timestamp'] = merged_non_counters_df['timestamp'] - merged_non_counters_df['timestamp'].min()

            print_err_log(dvfs, qps, itr, err_dir)
            save_merged_logs(merged_counters_df, merged_non_counters_df, dvfs, qps, itr, app, rapl='135', run='0')
            print()
            print('----------------------------------------')
            print()
            
            err_file.close()


merged log exists for (DVFS=0x1d00 ITR-DELAY=100, QPS=600k)..
-------------------------------------------------------------- PRE-PARSED 16 LOGS ---------------------


merged log exists for (DVFS=0x1d00 ITR-DELAY=40, QPS=600k)..
-------------------------------------------------------------- PRE-PARSED 16 LOGS ---------------------


merged log exists for (DVFS=0x1d00 ITR-DELAY=400, QPS=600k)..
-------------------------------------------------------------- PRE-PARSED 16 LOGS ---------------------


merged log exists for (DVFS=0x1d00 ITR-DELAY=10, QPS=600k)..
-------------------------------------------------------------- PRE-PARSED 16 LOGS ---------------------


merged log exists for (DVFS=0x1d00 ITR-DELAY=250, QPS=600k)..
-------------------------------------------------------------- PRE-PARSED 16 LOGS ---------------------


merged log exists for (DVFS=0x1d00 ITR-DELAY=200, QPS=600k)..
-------------------------------------------------------------- PRE-PARSED 16 LOGS -----------------

mcd_0_rapl_135/600k_qps/linux_mcd_dmesg_0_0x1d00_135_600k/linux.mcd.dmesg.0_5_2_0x1d00_135_600000:	 81.7% -- replaced with mcd_0_rapl_135/600k_qps/linux_mcd_dmesg_0_0x1d00_135_600k/linux.mcd.dmesg.0_5_2_0x1d00_135_600000.gz

CORE:  5
         MERGED COUNTERS:  (237482, 6)
         MERGED NON COUNTERS:  (23176026, 5)
mcd_0_rapl_135/600k_qps/linux_mcd_dmesg_0_0x1d00_135_600k/linux.mcd.dmesg.0_6_2_0x1d00_135_600000.gz:	 81.7% -- replaced with mcd_0_rapl_135/600k_qps/linux_mcd_dmesg_0_0x1d00_135_600k/linux.mcd.dmesg.0_6_2_0x1d00_135_600000
mcd_0_rapl_135/600k_qps/linux_mcd_dmesg_0_0x1d00_135_600k/linux.mcd.dmesg.0_6_2_0x1d00_135_600000:	 81.7% -- replaced with mcd_0_rapl_135/600k_qps/linux_mcd_dmesg_0_0x1d00_135_600k/linux.mcd.dmesg.0_6_2_0x1d00_135_600000.gz

CORE:  6
         MERGED COUNTERS:  (257270, 6)
         MERGED NON COUNTERS:  (25100564, 5)
mcd_0_rapl_135/600k_qps/linux_mcd_dmesg_0_0x1d00_135_600k/linux.mcd.dmesg.0_7_2_0x1d00_135_600000.gz:	 81.7% -- replaced with mcd_0_rapl_135

In [13]:
merged_counters_df

Unnamed: 0,instructions_diff,cycles_diff,ref_cycles_diff,llc_miss_diff,joules_diff,timestamp
0,1032804.0,2368319.0,2426024.0,5616.0,0.253150,0.000000
1,1087252.0,2746030.0,2876075.0,7528.0,0.206973,0.000002
2,1225372.0,2765001.0,2868216.0,7130.0,0.206973,0.000015
3,1198750.0,2777871.0,2879410.0,7176.0,0.206973,0.000020
4,1013150.0,2634384.0,2726783.0,7344.0,0.253150,0.000022
...,...,...,...,...,...,...
316644,509498.0,1485485.0,1485409.0,3353.0,0.247538,20.003176
316645,533796.0,1428733.0,1428685.0,3039.0,0.247538,20.003187
316646,533315.0,1376728.0,1376688.0,3398.0,0.264740,20.003206
316647,595259.0,1611807.0,1611791.0,4182.0,0.247538,20.003210


In [197]:
merged_non_counters_df

Unnamed: 0,rx_bytes,rx_desc,tx_bytes,tx_desc,timestamp
0,66.0,1.0,74.0,2.0,0.000000
1,66.0,1.0,74.0,2.0,0.000023
2,66.0,1.0,74.0,2.0,0.000045
3,66.0,1.0,74.0,2.0,0.000065
4,66.0,1.0,74.0,2.0,0.000086
...,...,...,...,...,...
1403663,0.0,0.0,90.0,3.0,20.000213
1403664,66.0,1.0,0.0,0.0,20.000214
1403665,66.0,1.0,0.0,0.0,20.000220
1403666,66.0,1.0,180.0,6.0,20.000225


In [119]:
# # given (dvfs, qps, itr), concatenate all per-core logs into one big dataframe
# def concat_core_logs(dvfs, qps, itr, rapl='135', run='0'):
#     print('Concatenating all per-core logs with ITR-DELAY = ', itr)
    
#     # here are all the log files for this dvfs & qps
#     logs_dir = qps + '_qps/linux_mcd_dmesg_' + run + '_' + dvfs + '_' + rapl + '_' + qps + '/'
#     # here are all the time-management files for this dvfs & qps
#     rdtsc_dir = qps + '_qps/linux_mcd_rdtsc_' + run + '_' + dvfs + '_' + rapl + '_' + qps +'/'
#     rdtsc_file = rdtsc_dir + 'linux.mcd.rdtsc.' + run + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'

#     start, end = eigen_analysis.get_rdtsc(rdtsc_file)
    
#     # initializing error log file
#     err_dir = 'err_logs/'
#     err_filename = 'err_log_' + run + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'
#     err_file = open(err_dir + err_filename, 'w')

#     # here will be stored counter-based log data from all cores
#     counters_full_df = pd.DataFrame()
#     # here will be stored non-counter-based log data from all cores
#     non_counters_full_df = pd.DataFrame()
    
#     # TODO remove fixed core-id range
#     for c in range(0,16):
#         file = logs_dir + 'linux.mcd.dmesg.' + run + '_' + str(c) + '_' + itr + '_' + dvfs + '_' + rapl + '_' + qps[:-1] + '000'
#         df = pd.read_csv(file, sep = ' ', names = cols, index_col='i')
#         df = df[(df['timestamp'] >= start) & (df['timestamp'] <= end)]
#         df['timestamp'] = df['timestamp'] - df['timestamp'].min()
#         df['timestamp'] = df['timestamp'] * time_unit
#         df['joules'] = df['joules'] * joules_unit

#         # CONCATENATING MILLISECOND-LEVEL PER-CORE DFS
#         ##############################################
#         # removing zero-filled log-entries
#         # -> these represent interrupt occurrences at a frequency greater than per-1ms
#         counters_df = df[['joules', 'instructions', 'cycles', 'ref_cycles', 'llc_miss', 'timestamp']].copy()
#         counters_df = counters_df[(counters_df['joules'] > 0) & (counters_df['instructions'] > 0) \
#                                             & (counters_df['cycles'] > 0) & (counters_df['ref_cycles'] > 0) \
#                                             & (counters_df['llc_miss'] > 0)]

#         tmp = counters_df['timestamp']        
#         # computing diffs of counter readings
#         df_diffs = counters_df.diff().dropna().copy()
#         df_diffs.columns = [f'{c}_diff' for c in df_diffs.columns]
#         df_diffs = handle_neg_diffs(df_diffs, counters_df, c, err_file)
#         df_diffs = df_diffs.drop(['timestamp_diff'], axis=1)
#         df_diffs['timestamp'] = tmp
        
#         if counters_full_df.shape[0] == 0:
#             counters_full_df = df_diffs.copy()
#         else:
#             counters_full_df = counters_full_df.merge(df_diffs, \
#                                                       left_on = 'timestamp', \
#                                                       right_on = 'timestamp', \
#                                                       how='outer', \
#                                                       sort=True, \
#                                                       suffixes=('', '_0')).fillna(0)        

#         # CONCATENATING MICROSECOND-LEVEL PER-CORE DFS
#         ##############################################
#         non_counters_df = df[['rx_bytes', 'rx_desc', 'tx_bytes', 'tx_desc', 'timestamp']].copy()                                                          

#         if non_counters_full_df.shape[0] == 0:
#             non_counters_full_df = non_counters_df.copy()
#         else:
#             non_counters_full_df = non_counters_full_df.merge(non_counters_df, \
#                                                               left_on = 'timestamp', \
#                                                               right_on = 'timestamp', \
#                                                               how='outer', \
#                                                               sort=True, \
#                                                               suffixes=('', '_0')).fillna(0)         
            
#         print('CORE: ', str(c))
#         print('         NON COUNTERS:  full =', non_counters_df.shape[0], \
#               '  expected:', int(20 * 10**6 / int(itr)))        
#         print('         COUNTERS:      full =', counters_df.shape[0], \
#               '  after computing diffs =', df_diffs.shape[0]          \
#              )

#     # delete error log if empty
#     err_file.close()
#     if (os.path.getsize(err_dir + err_filename) == 0):
#         os.remove(err_dir + err_filename)
        
#     print()
#     print('-------------------------------------------------- PARSED 16 LOGS -------------------------')
#     print()
#     return counters_full_df, non_counters_full_df

In [120]:
# def merge_concat_logs(counters_full_df, non_counters_full_df):
    
#     # creating dfs of average per-core log readings
#     df_merged_counter = pd.DataFrame(columns=['instructions_diff', 'cycles_diff', 'ref_cycles_diff', \
#                                      'llc_miss_diff', 'joules_diff'])
#     df_merged_non_counter = pd.DataFrame(columns=['rx_bytes', 'rx_desc', 'tx_bytes', 'tx_desc'])

#     for col in df_merged_counter.columns:
#         df_merged_counter[col] = (counters_full_df[[col, col+'_0']].sum(axis=1))
#     df_merged_counter['timestamp'] = counters_full_df['timestamp']
#     for col in df_merged_non_counter.columns:
#         df_merged_non_counter[col] = (non_counters_full_df[[col, col+'_0']].sum(axis=1))
#     df_merged_non_counter['timestamp'] = non_counters_full_df['timestamp']
    
#     return df_merged_counter, df_merged_non_counter