In [1]:
import os
import numpy as np
import random
import scipy as sp
from matplotlib import pyplot as plt
import seaborn as sns
import itertools
import copy
import pandas as pd

In [2]:
def new_stage(id=None, completion_time=100, finishing_time=None):
    # completion = completion_time + random.randint(completion_time)
    
    return {
        'id': id,
        'completion_time': completion_time,
        'arrival_time': finishing_time - completion_time,
        'finishing_time': finishing_time
    }

In [None]:
def generate_sched(stages, transition_proba, length=15):
    """_summary_ To generate a trace
    """
    res = []
    prev = random.choice(stages)
    res.append(prev)
    
    for _ in range(length-1):
        prev = random.choices(stages, transition_proba[prev], k=1)[0]
        res.append(prev)
    return res

def gen_job(stages):
    return {i: None for i in stages}

def run_simulation_(stages, stages_completion, premature_stop_latency, transitions, total_func_calls, n_cores):
    trace = [generate_sched(stages=stages, transition_proba=transitions, length=total_func_calls) for _ in range(n_cores)]
    lasts = [0 for _ in stages]
    todo = [gen_job(stages) for _ in range(total_func_calls * n_cores // 2)]

    last_stage_cpu_timestamp = [0 for _ in trace]

    for workers in zip(*trace):
        workers = list(workers)
        workers.sort()

        for cpu, worker in enumerate(workers):
            timestamp = last_stage_cpu_timestamp[cpu] + stages_completion[worker]
            stage = new_stage(id=worker, completion_time=stages_completion[worker], finishing_time=timestamp)
            last_stage_cpu_timestamp[cpu] = timestamp
#             print(stage, last_stage_cpu_timestamp)

            if worker == stages[0]:
            # First function marks start of working on next job
                if todo[lasts[worker]][worker] is None:
                    todo[lasts[worker]][worker] = stage
                    lasts[worker] += 1
                else:
                    lasts[worker] += 1
                    todo[lasts[worker]][worker] = stage
            else:
                # If last function of job is not finished yet, then skip.
                last_stage = todo[lasts[worker]][worker-1]
                if last_stage is None or last_stage['finishing_time'] < stage['arrival_time']:
                    stage['completion_time'] = premature_stop_latency
                    new_timestamp = premature_stop_latency + stage['arrival_time']
                    stage['finishing_time'] = new_timestamp
                    last_stage_cpu_timestamp[cpu] = new_timestamp
                    continue
                else:
                    if todo[lasts[worker]][worker] is None:
                        todo[lasts[worker]][worker] = stage
                        lasts[worker] += 1
                    else:
                        lasts[worker] += 1
                        todo[lasts[worker]][worker] = stage

    return todo, last_stage_cpu_timestamp

def compute_metrics(todo, last_timestamps, n_stages):
    i = 0
    done_jobs = 0
    latencies = []

    def is_completed(job):
        i = 0
        while i < n_stages:
            if job[i] is None:
                return False
            i += 1
        return True
    
    def compute_latency(job):
        return job[n_stages-1]['finishing_time'] - job[0]['arrival_time']
    
    job = todo[i]
    while is_completed(job):
        done_jobs += 1
        latencies.append(compute_latency(job))
        i += 1
        job = todo[i]

    total_time = max(last_timestamps)

    throughput = done_jobs / total_time
    tail_latency = np.percentile(latencies, 99)
    mean_latency = np.mean(latencies)
    median_latency = np.median(latencies)

    return done_jobs, total_time, median_latency, mean_latency, tail_latency

In [11]:
def run_simulation(stages, stages_completion, premature_stop_latency, transitions, total_func_calls, n_cores, runs=1):
    names = ['total_work', 'duration', 'median_latency', 'mean_latency', 'tail_latency']
    todos = [run_simulation_(stages, stages_completion, premature_stop_latency,  transitions, total_func_calls, n_cores) for _ in range(runs)]
    n_stages = len(stages)
    data = [compute_metrics(todo, last_times, n_stages) for todo, last_times in todos]
    return pd.DataFrame(data=data, columns=names)

In [12]:
def rand(x, y, n):
    if x == y:
        return 0
    else:
        return 1/(n-1)

def ordered(x, y, n, weight=1):
    if x == y:
        return 0
    m = y - x
    if m < 0:
        m += n
    if m == 1:
        return weight
    else:
        return (1-weight)/(n-2)

def generate_order_transition(weight, stages):
    n_stages = len(stages)
    res = [ [0 for _ in stages ] for _ in stages ]
    
    for x, y in itertools.product(stages, stages):
        res[x][y] = ordered(x, y, n_stages, weight=weight)
    return res

In [19]:
n_stages = 4
stages_completion = [10, 30, 20, 10]
premature_stop_latency = 1
stages = [i for i in range(n_stages)]
n_cores = 1
total_func_calls = 10000
proba = [ [0 for _ in stages] for _ in stages ]
random_transitions = [
    [0, .5, .5],
    [.5, 0, .5],
    [.5, .5, 0]
]
ordered_transitions = [
    [0, 1, 0],
    [0, 0, 1],
    [1, 0, 0],
]
semi_ordered_transitions = [
    [0, .99, .01],
    [.01, 0, .99],
    [.99, .01, 0]
]

rand_weights = [.50, .75, .90, .99, 1]

random_transitions = copy.deepcopy(proba)
order_transition = {i: generate_order_transition(i, stages) for i in rand_weights}

for x, y in itertools.product(stages, stages):
    random_transitions[x][y] = rand(x, y, n_stages)

def order_transition(i):
    return generate_order_transition(i, stages)

# print("Optimal throughput:", n_cores/n_stages)
# print("Optimal latency:", n_stages)


# df_random = run_simulation(stages, order_transition(.5), total_time, n_cores, runs=1000)
# df_random.median()

# df_random = run_simulation(stages, order_transition(.9), total_time, n_cores, runs=1000)
# df_random.median()


In [20]:
print("Optimal:")
df_random = run_simulation(stages, stages_completion, premature_stop_latency, order_transition(1), total_func_calls, n_cores, runs=15)
df_random.median()

Optimal:


total_work          2499.0
duration          174991.0
median_latency        70.0
mean_latency          70.0
tail_latency          70.0
dtype: float64

In [21]:
# print("Semi-ordered:")
# df_semi_ordered = run_simulation(stages, stages_completion, premature_stop_latency, random_transitions, total_func_calls, n_cores, runs=15)
# df_semi_ordered.median()

In [24]:
todo, finishing = run_simulation_(stages, stages_completion, premature_stop_latency, order_transition(.99), total_func_calls, n_cores)
compute_metrics(todo, finishing, n_stages)

(4, 32719, 70.0, 70.0, 70.0)