#  Simulation Framework
## Idea is to estimate different levels of idle times at different levels of preemption

In [1]:
import os

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime, timedelta

## Average Performance Numbers from Profiling

In [2]:
BASE_DIR=os.path.join('checkpointing_baseline', 'async-checkpointing', 'run-86')

performance_stats = {}

### Reconfig Times

In [3]:
reconfig = pd.read_csv(os.path.join(BASE_DIR, 'reconfig.csv'), names=['ws', 'start', 'end', 'elapsed'])
reconfig = reconfig.drop([0])

In [4]:
mean_reconfig_time = reconfig['elapsed'].mean()
performance_stats['reconfiguration'] = mean_reconfig_time

### Iteration Times

In [5]:
loop_times = pd.read_csv(os.path.join(BASE_DIR, 'loop_times_0.csv'), names=['ws', 'step', 'loop start', 'loop end', 'train time', 'start chkpt thread', 'total loop time'])

In [6]:
ws_iter_times = {}
for ws in loop_times.ws.unique():
    filtered_df = loop_times[loop_times['ws'] == ws]
    ws_iter_times[ws] = filtered_df['train time'].mean()

In [7]:
performance_stats['iter_times'] = ws_iter_times

### Save Checkpoint

In [8]:
chkpt_times = pd.read_csv(os.path.join(BASE_DIR, 'async-checkpoint_0.csv'), names=['elapsed'])

In [9]:
mean_chkpt_time = chkpt_times['elapsed'].mean()
performance_stats['checkpoint'] = mean_chkpt_time

### Load Times

In [10]:
load_times = pd.read_csv(os.path.join(BASE_DIR, 'load_times_0.csv'), names=['ws', 'step', 'elapsed'])

In [11]:
mean_load_times = load_times['elapsed'].mean()
performance_stats['load checkpoint'] = mean_load_times

In [12]:
performance_stats

{'reconfiguration': 87.87109575271606,
 'iter_times': {16: 17.994171783779606,
  32: 11.380824764730477,
  48: 9.318327348709106,
  64: 8.282614257155346},
 'checkpoint': 88.99610565625704,
 'load checkpoint': 46.83130866289139}

## Global Functions + Importing Base Data

In [13]:
def ts_to_dt(ts, format="%Y-%m-%dT%H:%M:%SZ"):
    return datetime.strptime(ts, format)

In [14]:
real_node_trace_file = 'full-64-node-trace.csv'
real_terminate_trace_file = 'full-64-terminate-trace.csv'

In [15]:
node_trace = pd.read_csv(real_node_trace_file)
node_trace['dt'] = node_trace['timestamp'].apply(lambda x : ts_to_dt(x))
terminate_trace = pd.read_csv(real_terminate_trace_file)
terminate_trace['dt'] = terminate_trace['timestamp'].apply(lambda x: ts_to_dt(x))

## Analyzing Mean Time To Preemption (MTTP)

In [16]:
ranges = {'ranges': []}
for ind in range(len(node_trace) - 1):
    row = node_trace.iloc[ind]
    next_row = node_trace.iloc[ind + 1]

    ranges['ranges'].append((next_row['dt'] - row['dt']).total_seconds())

ranges_df = pd.DataFrame(ranges)
print("MTTP", ranges_df['ranges'].mean())
print("TTP Std:", np.sqrt(ranges_df['ranges'].var()))

MTTP 221.875
TTP Std: 171.06669982633582


## Slight Aside: Generate a "Node Additions" Trace

In [17]:
additions = {
    'timestamp': [],
    'dt': [],
    'count': []
}

for ind in range(len(node_trace)-1):
    row = node_trace.iloc[ind]
    next_row = node_trace.iloc[ind+1]

    if next_row['count'] > row['count']:
        additions['timestamp'].append(row['timestamp'])
        additions['dt'].append(ts_to_dt(row['timestamp']))
        additions['count'].append(next_row['count'] - row['count'])

In [18]:
additions_trace = pd.DataFrame(additions)
additions_trace.to_csv('full-64-additions-trace.csv')

## Generating actual numbers

In [19]:
total_preemptions = terminate_trace['number'].sum()
total_additions = additions_trace['count'].sum()
trace_start = node_trace.iloc[0]['dt']
trace_end = node_trace.iloc[len(node_trace)-1]['dt']
total_time_td = trace_end - trace_start
total_minutes = total_time_td.total_seconds() / 60

In [20]:
preemptions_per_minute = float(total_preemptions) / float(total_minutes)
additions_per_minute = float(total_additions) / float(total_minutes)

print(preemptions_per_minute)
print(additions_per_minute)

0.39436619718309857
0.5295774647887324


In [21]:
minutes_preemptions_occurred = len(terminate_trace)
minutes_additions_occurred = len(additions_trace)
chance_preempt_this_minute = float(minutes_preemptions_occurred) / total_minutes
chance_add_this_minute = float(minutes_additions_occurred) / total_minutes

print(chance_preempt_this_minute)
print(chance_add_this_minute)

0.16338028169014085
0.14366197183098592


In [22]:
big_adds = additions_trace[additions_trace['count'] > 10]
regular_adds = additions_trace[additions_trace['count'] <= 10]

num_big_additions = len(big_adds)
num_regular_additions = len(regular_adds)

probability_big_add = float(num_big_additions) / float(len(additions_trace))
probability_reg_add = float(num_regular_additions) / float(len(additions_trace))

assert probability_big_add + probability_reg_add == 1
print(probability_big_add)
print(probability_reg_add)

regular_add_mean = regular_adds['count'].mean()
regular_add_std = np.sqrt(regular_adds['count'].var())
print(regular_add_mean)
print(regular_add_std)

0.058823529411764705
0.9411764705882353
2.5416666666666665
1.9456261184790364


In [23]:
preempt_count_mean = terminate_trace['number'].mean()
preempt_count_std = np.sqrt(terminate_trace['number'].var())

print(preempt_count_mean)
print(preempt_count_std)

2.413793103448276
2.086165648825032


## Attempted recreation of a preemption trace

In [42]:
generated_preemption_trace = {
    'dt': [],
    'preemptions': []
}

generated_addition_trace = {
    'dt': [],
    'additions': []
}

curr_dt = datetime(2021, 9, 14, 17, 00)

current_instances = 32
inst_min = 0
inst_max = 64
for _ in range(int(total_minutes)):
    #print(f'Processing time {curr_dt.("%Y-%m-%d %H:%M:%S")}')
    preempt_now = np.random.rand()
    add_now = np.random.rand()
    if preempt_now < chance_preempt_this_minute:
        num_to_preempt = np.random.normal(preempt_count_mean, preempt_count_std, 1)[0]
        num_to_preempt = np.abs(np.round(num_to_preempt))
        if num_to_preempt == 0:
            num_to_preempt = 1
        #probs = np.random.rand(current_instances)
        #keep = probs > chance_preempt_this_minute
        generated_preemption_trace['dt'].append(curr_dt)
        generated_preemption_trace['preemptions'].append(num_to_preempt)

    if add_now < chance_add_this_minute:
        big_or_regular = np.random.random()
        num_to_add = None
        if big_or_regular < probability_big_add:
            num_to_add = np.random.randint(11, 32)
        else:
            num_to_add = np.random.normal(regular_add_mean, regular_add_std, 1)[0]
            num_to_add = np.abs(np.round(num_to_add))
            if num_to_add == 0:
                num_to_add = 1

        assert num_to_add != None
        generated_addition_trace['dt'].append(curr_dt)
        generated_addition_trace['additions'].append(num_to_add)

    curr_dt += timedelta(minutes=1)

In [43]:
gen_pre_df = pd.DataFrame(generated_preemption_trace)
gen_add_df = pd.DataFrame(generated_addition_trace)

gen_pre_df.to_csv('generated-preemptions-trace.csv')
gen_add_df.to_csv('generated-additions-trace.csv')

# Running the actual simulations

In [44]:
import random

In [45]:
def round_values(d):
    new_dict = {}
    for k, v in d.items():
        if isinstance(v, dict):
            new_dict[k] = round_values(v)
        else:
            new_dict[k] = np.round(v)
    
    return new_dict

performance_stats = round_values(performance_stats)
performance_stats

{'reconfiguration': 88.0,
 'iter_times': {16: 18.0, 32: 11.0, 48: 9.0, 64: 8.0},
 'checkpoint': 89.0,
 'load checkpoint': 47.0}

In [100]:
import glob

def get_next_dir():
    base_dir = 'simulation_results'
    os.makedirs(base_dir, exist_ok=True)

    existing_trials_so_far = glob.glob(os.path.join(base_dir, 'run-*'))
    existing_trials_so_far = [int(r[r.rfind('-')+1:]) for r in existing_trials_so_far]
    next_run = 0 if len(existing_trials_so_far) == 0 else max(existing_trials_so_far) + 1

    return os.path.join(base_dir, f'run-{next_run}')

def run_simulation(preemption_chance_per_minute, preemption_distribution, addition_chance_per_minute, addition_distribution):
    start = datetime(2021, 9, 14, 17, 00)
    end = start + timedelta(minutes=total_minutes)

    active_instances = set()
    standy_instances = set()
    prepping_instances = set()
    all_instances = set()

    prepping_timers = {}
    event_timers = {}
    current_state = 'training'
    event_id = 0

    preemption_chance = preemption_chance_per_minute / 12
    addition_chance = addition_chance_per_minute / 12

    current_instances = 32
    pipeline_depth = 16
    id = 0
    for _ in range(current_instances):
        all_instances.add(id)
        active_instances.add(id)
        id += 1

    step = 0
    trace = {
        'ws': [current_instances],
        'dt': [start],
        'event': ['step'],
        'event-id': [0]
    }

    reconfig_cnt = 0
    reconfig_end_time = None
    last_saved_iteration = 0
    checkpoint_running = False
    checkpoint_start = None
    checkpoint_step = 0

    inst_min = 0
    inst_max = 64
    curr_time = start
    exp_dir = get_next_dir()
    os.makedirs(exp_dir)
    f = open(os.path.join(exp_dir, 'simulation-output.txt'), 'w')
    while curr_time < end:
        f.write(f'{curr_time.strftime("%Y-%m-%d %H:%M:%S")} Cluster Config this step:\n')
        f.write(f'######## {len(active_instances)} active instances: {active_instances}\n')
        f.write(f'######## {len(standy_instances)} standby instances: {standy_instances}\n')
        f.write(f'######## {len(prepping_instances)} prepping instances: {prepping_instances}\n')
        f.write(f'######## {len(all_instances)} total instances: {all_instances}\n')
        f.write('========================================================================\n')

        num_to_add = 0
        add_this_round = np.random.random() < addition_chance
        if add_this_round:
            big_or_regular = np.random.random()
            if big_or_regular < 0.05:
                num_to_add = np.random.randint(11, 32)
                if len(all_instances) + num_to_add >= inst_max:
                    num_to_add = inst_max - len(all_instances)
            else:
                num_to_add = np.random.normal(addition_distribution['mean'], addition_distribution['std'])
                num_to_add = np.abs(np.round(num_to_add))
                if num_to_add == 0:
                    num_to_add = 1
                elif len(all_instances) + num_to_add >= inst_max:
                    num_to_add = inst_max - len(all_instances)

        num_to_preempt = 0
        preempt_this_round = np.random.random() < preemption_chance
        if preempt_this_round and not len(all_instances) <= inst_min:
            num_to_preempt = np.random.normal(preemption_distribution['mean'], preemption_distribution['std'])
            num_to_preempt = np.abs(np.round(num_to_preempt))
            if num_to_preempt == 0:
                num_to_preempt = 1

        preempted_instances = random.sample(all_instances, int(num_to_preempt))
        if len(preempted_instances) > 0:
            f.write(f'Preempting {len(preempted_instances)} with ids {preempted_instances}\n')
        lost_active = False
        for inst_id in preempted_instances:
            if inst_id in active_instances:
                lost_active = True
                active_instances.remove(inst_id)
            elif inst_id in standy_instances:
                standy_instances.remove(inst_id)
            elif inst_id in prepping_instances:
                prepping_instances.remove(inst_id)
                del prepping_timers[inst_id]
            else:
                print(f'What the F?')

            all_instances.remove(inst_id)

        done_prepping = []
        for inst_id in prepping_instances:
            time_spent_prepping = curr_time - prepping_timers[inst_id]
            if time_spent_prepping.total_seconds() >= 180:
                done_prepping.append(inst_id)

        if len(done_prepping) > 0:
            f.write(f'Moving {len(done_prepping)} instances to standby with ids {done_prepping}\n')
        for inst_id in done_prepping:
            standy_instances.add(inst_id)
            prepping_instances.remove(inst_id)
            del prepping_timers[inst_id]

        if num_to_add > 0:
            f.write(f'Adding {num_to_add} new instances with sart id {id}\n')
        for _ in range(int(num_to_add)):
            all_instances.add(id)
            prepping_instances.add(id)
            prepping_timers[id] = curr_time
            id += 1

        new_pipeline_avail = len(standy_instances) >= pipeline_depth

        last_event = {
            'ws': trace['ws'][-1],
            'dt': trace['dt'][-1],
            'event': trace['event'][-1],
            'event-id': trace['event-id'][-1]
        }
        if checkpoint_running:
            checkpoint_end_time = checkpoint_start + timedelta(seconds=performance_stats['checkpoint'])
            if curr_time > checkpoint_end_time:
                f.write(f'Finished checkpoint {checkpoint_step}\n')
                last_saved_iteration = checkpoint_step
                checkpoint_running = False

        if last_event['event'] == 'step':
            iter_end_time = last_event['dt'] + timedelta(seconds=performance_stats['iter_times'][last_event['ws']])
            if curr_time > iter_end_time:
                if not checkpoint_running:
                    checkpoint_start = iter_end_time
                    checkpoint_step = step
                    checkpoint_running = True

                step += 1
                trace['ws'].append(last_event['ws'])
                trace['dt'].append(iter_end_time)
                trace['event'].append('step')
                trace['event-id'].append(step)

        elif last_event['event'] == 'reconfig':
            if lost_active or new_pipeline_avail:
                reconfig_end_time += timedelta(seconds=performance_stats['reconfiguration'])
            if curr_time > reconfig_end_time:
                step = last_saved_iteration + 1
                trace['ws'].append(len(active_instances))
                trace['dt'].append(reconfig_end_time)
                trace['event'].append('step')
                trace['event-id'].append(step)
                reconfig_end_time = None
        else:
            print('huh?')

        if lost_active or new_pipeline_avail:
            trace['ws'].append(len(active_instances))
            trace['dt'].append(curr_time)
            trace['event'].append('reconfig')
            trace['event-id'].append(reconfig_cnt)
            reconfig_cnt += 1
            if reconfig_end_time == None:
                reconfig_end_time = curr_time + timedelta(seconds=performance_stats['reconfiguration'])

            checkpoint_running = False

            if lost_active and not new_pipeline_avail:
                f.write('Lost active instance but no new pipeline avail: ')
                diff = len(active_instances) % pipeline_depth
                if len(standy_instances) > pipeline_depth - diff:
                    f.write('case 1a\n')
                    n_to_move = pipeline_depth - diff
                    to_move = random.sample(standy_instances, n_to_move)
                    for k in to_move:
                        active_instances.add(k)
                        standy_instances.remove(k)
                else:
                    f.write('case 1b\n')
                    to_move = random.sample(active_instances, diff)
                    for k in to_move:
                        standy_instances.add(k)
                        active_instances.remove(k)

            elif not lost_active and new_pipeline_avail:
                f.write('No instance lost but new pipeline is availalbe\n')
                f.write(f'LEN STDBY: {len(standy_instances)}, pipe depth: {pipeline_depth}, ')
                n_to_move = (len(standy_instances) // pipeline_depth) * pipeline_depth
                f.write(f'N_TO_MOVE: {n_to_move}\n')
                to_move = random.sample(standy_instances, n_to_move)
                for k in to_move:
                    active_instances.add(k)
                    standy_instances.remove(k)

            else:
                f.write('Both lost an instance AND got a new pipeline\n')
                diff = len(active_instances) % pipeline_depth
                n_to_move = ((len(standy_instances) // pipeline_depth) * pipeline_depth) - diff
                to_move = random.sample(standy_instances, n_to_move)
                for k in to_move:
                    active_instances.add(k)
                    standy_instances.remove(k)

                while len(standy_instances) >= pipeline_depth:
                    move_pipeline = random.sample(standy_instances, pipeline_depth)
                    for k in move_pipeline:
                        active_instances.add(k)
                        standy_instances.remove(k)

        curr_time += timedelta(seconds=5)
        f.write('========================================================================\n')
        f.write('\n\n')

    print('done')
    return trace

In [101]:
trace = run_simulation(0.5, {'mean': 0, 'std': 1}, 0.5, {'mean': 0, 'std': 1})

since Python 3.9 and will be removed in a subsequent version.
  preempted_instances = random.sample(all_instances, int(num_to_preempt))
since Python 3.9 and will be removed in a subsequent version.
  to_move = random.sample(active_instances, diff)
since Python 3.9 and will be removed in a subsequent version.
  to_move = random.sample(standy_instances, n_to_move)
since Python 3.9 and will be removed in a subsequent version.
  to_move = random.sample(standy_instances, n_to_move)


KeyError: 0

In [60]:
trace_df = pd.DataFrame(trace)
trace_df

Unnamed: 0,ws,dt,event,event-id
0,32,2021-09-14 17:00:00,step,0
1,32,2021-09-14 17:00:11,step,1
2,32,2021-09-14 17:00:22,step,2
3,32,2021-09-14 17:00:33,step,3
4,32,2021-09-14 17:00:44,step,4
...,...,...,...,...
700,191,2021-09-14 22:41:55,reconfig,659
701,191,2021-09-14 22:49:25,reconfig,660
702,191,2021-09-14 22:50:55,reconfig,661
703,190,2021-09-14 22:51:20,reconfig,662
