In [1]:
# Initial setup

try:
    plt_inline
except NameError:
    # Avoid small fonts when inlining again
    %matplotlib inline
    plt_inline = True
    
from inc.notebook001 import *

In [2]:
## Load K-means experiments
parser = SparkParser()
apps = tuple(parser.parse_folder('data/hibench/kmeans'))
print('{} logs/executions read.'.format(len(apps)))

740 logs/executions read.
CPU times: user 22.2 s, sys: 1.41 s, total: 23.6 s
Wall time: 23.6 s


## When data is read from HDFS

Among all the methods of reading data below, *Memory* is much faster then *Hadoop* or *Network*. At first, the duration prediction will deal with memory-only access. In order to do that, we will predict when data is read from *Hadoop* or *Network*. The prediction is based on the input size per worker of the profiling executions.

In [3]:
# Different methods of reading data
set(t.metrics.data_read_method
    for a in apps
        for s in a.stages
            for t in s.successful_tasks)

{'Network', 'Hadoop', None, 'Memory'}

In [6]:
def not_from_memory(stage):
    """Return True if data was read from Hadoop or Network."""
    for task in stage.successful_tasks:
        if task.metrics.data_read_method in ['Hadoop', 'Network']:
            return True
    return False

def is_profiling(app):
    """Check if app is part of the profiling phase."""
    return app.stages[0].records_read < 16384000

def get_min_size(apps):
    """Minimum input size per worker when memory is not enough."""
    min_size = None
    for app in apps:
        size = app.stages[0].bytes_read / app.slaves
        for stage in app.stages[1:]:
            if not_from_memory(stage) and (min_size is None or min_size > size):
                min_size = size
    return min_size

def predict_non_memory(apps, min_size):
    """Maximum number of workers when memory is not enough.
    
    Args:
        min_size (int): Minimum input size per worker that exceeds memory capacity.
    """
    res = []
    for total_input, records in set(
            (a.stages[0].bytes_read, a.stages[0].records_read)
             for a in apps):
        max_slaves = int(total_input/min_size)
        if max_slaves > 0:
            res.append((records, max_slaves))
    return res
                
# Only profiling executions for prediction
min_size = get_min_size(app for app in apps if is_profiling(app))
print('Based on the profiling executions, data may not fit in memory when input size is'
      ' {:.2f} MB/worker or more.'.format(min_size / 1024**2))

non_memory = predict_non_memory(apps, min_size)
pd.DataFrame.from_records(non_memory, columns=['samples', 'max workers'], index='samples'). \
    sort_index()

When input size is 784.45 MB/worker, data may not fit in memory. Based on this threshold, the table below shows the maximum number of workers that causes access to the disk or network.


Unnamed: 0_level_0,max workers
samples,Unnamed: 1_level_1
4096000,1
16384000,4
65536000,16


From the table above, we should use, for example, more than 4 workers if we want 16,384,000 samples to be processed completely from memory.

## DEPRECATED SECTIONS BELOW
## Regression on stage read data

In [None]:
def is_target(app):
    return app.stages[0].bytes_read > 8 * 10**6

def get_df_records(apps):
    ns_stages = set(len(app.stages) for app in apps)
    assert len(ns_stages) == 1, "{} stages found".format(len(ns_stages))
    n_stages = ns_stages.pop()
    # ['workers', 'samples', 'stage0read', 'stage0written', 'stage1read', ...]
    cols = ['workers', 'samples'] + ['stage{:d}{}'.format(stage, typ)
                                     for stage in range(n_stages)
                                     for typ in ('read', 'written')]
    rows = []
    for app in apps:
        # samples in millions
        row = [app.slaves, app.stages[0].records_read / 10**6]
        for stage in app.stages:
            # read/written Kb
            row.extend([stage.bytes_read / 1024, stage.bytes_written / 1024])
        rows.append(row)
    return rows, cols

In [None]:
records, cols = get_df_records(apps)
df = pd.DataFrame.from_records(records, columns=cols)

In [None]:
# Build graphs to understand how data grows as we increase the number of samples
# - Select a stage
# - Consider all number of workers
# - x: number of samples
# - y1: data read (scatterplot)
# - y2: data written (scatterplot)
def plot_stage(stage_id):
    x = 'samples'
    y1 = 'stage{:d}read'.format(stage_id)
    y2 = 'stage{:d}written'.format(stage_id)
    graph_df = pd.DataFrame()
    _df = df#[df[x] < 16]
    graph_df[x] = _df[x]
    graph_df[y1] = _df[y1] / 1024  # to MB
    graph_df[y2] = _df[y2] / 1024  # to MB
    alpha=0.5
    
    ax = graph_df.plot.scatter(x, y2, s=60, c='r', alpha=alpha)
    graph_df.plot.scatter(x, y1, s=60, alpha=alpha, ax=ax)
    
    prof_df = graph_df[[x, y1]][graph_df.samples < 16]
    x_max = graph_df[x].max()
    plot_data_size_prediction(ax, prof_df, x_max)
    
    plt.tight_layout()
    plt.show()
    
    #list_points(graph_df[[x, y1]])
    #list_points(graph_df[[x, y2]])
    
import numpy as np

def plot_data_size_prediction(ax, df, x_max):
    x, y = df.columns
    z = np.polyfit(df[x], df[y], 1)
    p = np.poly1d(z)
    xs = np.linspace(0, x_max, x_max)
    ax.plot(xs, p(xs))
    
from IPython.display import display
    
def list_points(df):
    col1, col2 = df.columns
    counts = df.groupby(col1).apply(lambda x: x[col2].value_counts())
    display(counts)

import matplotlib.pyplot as plt
    
for stage in range(len(apps[0].stages)):
    plot_stage(stage)
    

Minimum data size per worker when there's an HDFS read, and maximum data size per worker when only memory is used. Presented by stage, except for stage 0 (initial data is always read from disk).

In [None]:
def most_common(values):
    size, count = Counter(values).most_common(1)[0]
    return '{}/{}: {:.2f} MB'.format(count, len(values), size / 1024**2)

cols = ['stage', 'slaves', 'input per worker', 'HDFS']
records = ((stage.id, app.slaves, app.stages[0].bytes_read / app.slaves, not_from_memory(stage))
           for app in apps
               for stage in app.stages[1:])
_df = pd.DataFrame.from_records(records, columns=cols)
min_hdfs = _df[_df['HDFS']].drop('HDFS', axis=1).groupby(cols[:2]).agg(min). \
    rename(columns={'input per worker': 'min HDFS (MB/worker)'}) / 1024**2
max_mem = _df[~_df['HDFS']].drop('HDFS', axis=1).groupby(cols[:2]).agg(max). \
    rename(columns={'input per worker': 'max mem (MB/worker)'}) / 1024**2

# TODO all sizes per stage
def unique_str(values):
    mbs = sorted(set(round(val/1024**2, 2) for val in values), reverse=True)
    return ', '.join(str(mb) for mb in mbs)

all_sizes = _df.drop('HDFS', axis=1).groupby(cols[:2]).agg(unique_str). \
    rename(columns={'input per worker': 'all sizes (MB/worker)'})

# More detailed table
_df = pd.concat([max_mem, min_hdfs, all_sizes], axis=1).dropna()
cols = ['stage', 'slaves', 'max mem (MB/worker)', 'min HDFS (MB/worker)', 'all sizes (MB/worker)']


# # Per-stage mininum size for HDFS
# def unique_str_split(values):
#     all_entries = set()
#     for val in values:
#         all_entries.update(val.split(', '))
#     numbers = (float(entry) for entry in all_entries)
#     ordered = sorted(numbers, reverse=True)
#     return ', '.join(str(n) for n in ordered)

# _df.reset_index().drop(cols[1:3], axis=1).groupby('stage').agg({
#         cols[3]: min, cols[-1]: unique_str_split})[cols[-2:]]

_df[cols[-2]].min()

groups = set((app.stages[0].bytes_read, app.slaves, app.stages[0].records_read) for app in apps)

disk = ((records, slaves) for size, slaves, records in groups
        if size / slaves >= min_size)
print('Possible disk access:')
pd.DataFrame.from_records(disk, columns=['records', 'max slaves for HDFS']).groupby('records').max()

In [None]:
_df.iloc[:,:2].groupby

In [None]:
for app in apps:
    stage = app.stages[13]
    size = stage.bytes_read / app.slaves / 1024**2
    if abs(1104.75 - size) < 0.01 and not is_read_from_hdfs(stage):
        print(app.slaves)
#     elif abs(842.77 - size) < 0.01 and is_read_from_hdfs(stage):
#         print(app.filename)

In [None]:
def is_read_from_hdfs(stage):
    return 'Hadoop' in (t.metrics.data_read_method for t in stage.successful_tasks)

def get_records(apps):
    for app in apps:
        input0 = app.stages[0].bytes_read
        samples = app.stages[0].records_read
        for stage in app.stages:
            yield (
                is_read_from_hdfs(stage),
                input0,
                samples,
                app.slaves,
                stage.id,
                input0 / app.slaves / 1024**2,
                1)
            
cols = ['hadoop', 'input (stg 0)', 'samples', 'slaves', 'stage', 'input/slave', 'count (out of 10)']
_df = pd.DataFrame.from_records(get_records(apps), columns=cols).sort_values(cols[1:4])
_df[_df.hadoop & (_df.stage > 0)].drop('hadoop', axis=1).groupby(cols[1:5]).agg({cols[5]: min, cols[6]: sum})