In [1]:
%matplotlib inline
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import os
import itertools
import json
import re
import pandas as pd

In [2]:
def get_experiments(results_path):
    runre = re.compile(r'\.run\d\d$')

    experiments_dict = {}
    
    suites = os.listdir(results_path)
    suites = filter(lambda x: os.path.isdir(os.path.join(results_path, x)), suites)
    for suite in suites:
        experiments = os.listdir(os.path.join(results_path, suite))
        experiments = filter(lambda x: runre.search(x), experiments)
        experiments = map(lambda x: x[:-6], experiments)
        experiments = sorted(list(set(experiments)))
        
        experiments_dict[suite] = experiments
    
    return experiments_dict

In [3]:
results_path = '/Users/lauritz/Desktop/ilya_benchmark_results'
experiments = get_experiments(results_path)
print experiments

{'tpchq10.cluster': ['tpchq10.flink.top004', 'tpchq10.flink.top008', 'tpchq10.flink.top012', 'tpchq10.flink.top016', 'tpchq10.flink.top020', 'tpchq10.flink.top024', 'tpchq10.flink.top028', 'tpchq10.flink.top032', 'tpchq10.flink.top036', 'tpchq10.flink.top040', 'tpchq10.flink.top044', 'tpchq10.flink.top048', 'tpchq10.flink.top052', 'tpchq10.flink.top056', 'tpchq10.flink.top060'], 'cc.cluster.webgraph-uk-2007-05.multijob': ['cc.flink.webgraph-uk-2007-05.multijob.top004', 'cc.flink.webgraph-uk-2007-05.multijob.top008', 'cc.flink.webgraph-uk-2007-05.multijob.top012', 'cc.flink.webgraph-uk-2007-05.multijob.top016', 'cc.flink.webgraph-uk-2007-05.multijob.top020', 'cc.flink.webgraph-uk-2007-05.multijob.top024', 'cc.flink.webgraph-uk-2007-05.multijob.top028'], 'kmeans.full.wally': ['kmeans.multi.4216562650points.top004', 'kmeans.multi.4216562650points.top008', 'kmeans.multi.4216562650points.top012', 'kmeans.multi.4216562650points.top016', 'kmeans.multi.4216562650points.top020', 'kmeans.multi.4

In [4]:
def parse_flink_memory_statistics(log_file):
    pattern1 = re.compile(r'.*HEAP: (?P<used>\d+?)/\d+/\d+ MB.*')
    pattern2 = re.compile(r'.*Used Memory: (?P<used>\d+?)$')
    with open(log_file) as f:
        lines = f.readlines()
        
        stats = []
        for pattern in [pattern1, pattern2]:
            matches = map(lambda line: pattern.match(line), lines)
            matches = filter(lambda match: match is not None, matches)
            used = map(lambda match: int(match.group('used')), matches)
            stats.append(used)
        return np.vstack(stats).T

def get_flink_statistics(run_path):
    flink_logs_folder = os.path.join(run_path, 'logs', 'flink', 'flink-1.0.0')
    flink_logs = os.listdir(flink_logs_folder)
    flink_logs = filter(lambda x: x.endswith('.log'), flink_logs)
    
    data = []
    for flink_log in flink_logs:
        log_file = os.path.join(flink_logs_folder, flink_log)
        X = parse_flink_memory_statistics(log_file)
        data.append(X)
    data = np.vstack(data)
    return np.mean(data, axis=0)    

In [5]:
def get_dstat_statistics(run_path):
    dstat_logs_folder = os.path.join(run_path, 'logs', 'dstat', 'dstat-0.7.2')
    dstat_logs = os.listdir(dstat_logs_folder)
    
    data = []
    for dstat_log in dstat_logs:
        log_file = os.path.join(dstat_logs_folder, dstat_log)
        cols = range(1,7) + range(49,57)
        df = pd.read_csv(log_file, skiprows=6, header=0, usecols=cols)
        X = df.values
        data.append(X)
    data = np.vstack(data)
    return np.mean(data, axis=0)

In [6]:
def get_median_run(exp_path):
    runtimes = []
    for i in range(3):
        path = '%s.run%02d' % (exp_path, i+1)
        state = os.path.join(path, 'state.json')
        with open(state) as f:
            j = json.load(f)
            if j['runExitCode'] != 0:
                runtimes.append(np.inf)
                #raise ValueError('A run did not finish successfully.')
            else:
                runtimes.append(j['runTime'])
    run_idx = np.argmin(runtimes)
    return '%s.run%02d' % (exp_path, run_idx+1), runtimes[run_idx]

In [7]:
def parse_suite(suite):
    exps = experiments[suite]

    # runtime usr sys idl wai hiq siq used buff cach free recv send read writ memheap memdirect
    data = np.zeros((len(exps), 17))
    for i, exp in enumerate(exps):
        run_path, runtime = get_median_run(os.path.join(results_path, suite, exp))
        D = get_dstat_statistics(run_path)
        F = get_flink_statistics(run_path)

        data[i,0] = runtime
        data[i,1:15] = D
        data[i,15:17] = F   
        
    return data

In [8]:
tpchq10 = parse_suite('tpchq10.cluster')
kmeans = parse_suite('kmeans.full.wally')
cc = parse_suite('cc.cluster.webgraph-uk-2007-05.multijob')
np.savez('experiments.npz', tpchq10=tpchq10, kmeans=kmeans, cc=cc)