In [None]:
%load_ext blackcellmagic
import sys
import uuid
sys.path.insert(0, "..")
def gen_uuid():
    return str(uuid.uuid4())[:8]

In [None]:
import algo
import coloredlogs
import graph
import importlib
import logging
import math
import networkx as nx
import schedule as sch
import random
import topo
import yaml
importlib.reload(algo)
importlib.reload(graph)
importlib.reload(sch)
importlib.reload(topo)
coloredlogs.set_level(logging.INFO)

In [None]:
sc = topo.Scenario.from_dict(yaml.load(open("../samples/1e3h.yaml", "r").read(), Loader=yaml.Loader))

In [None]:
gen_args_list = [
    {
        "graph_length": random.randint(3, 11),
        # "mi_cb": lambda: int(math.pow(10, (random.random() * 1) + 0)),
        "mi_cb": lambda: 1,
        "memory_cb": lambda: int(2e8),
        "unit_size_cb": lambda: int(math.pow(10, (random.random() * 1) + 4)),
        "unit_rate_cb": lambda: int(math.pow(10, (random.random() * 1) + 1)),
        "source_hosts": ["rasp1", "rasp2", "rasp3"],
        "sink_hosts": ["cloud1"],
    }
    for _ in range(10)
]
graph_list = [
    graph.GraphGenerator("g" + str(idx), **gen_args).gen_random_chain_graph()
    for idx, gen_args in enumerate(gen_args_list)
]

In [None]:
with open("../cases/a.yaml", "w") as f:
    graph.ExecutionGraph.save_all(graph_list, f)

## Bare implementation of min-cut scheduling

In [None]:
sc.topo.clear_occupied()
flow_scheduler = sch.FlowScheduler(sc)
flow_scheduler.logger.setLevel(logging.INFO)
flow_calculator = sch.LatencyCalculator(sc.topo)
flow_calculator.logger.setLevel(logging.INFO)
flow_result_list = flow_scheduler.schedule_multiple(graph_list)
for g, result in zip(graph_list, flow_result_list):
    if result is None:
        print('none')
        continue
    flow_calculator.add_scheduled_graph(g, result)
flow_latency, flow_bp = flow_calculator.compute_latency()
print(flow_latency)
print(flow_bp)
print(sum(flow_latency.values()))
print(sum(flow_latency.values()) / len(flow_latency))
print(sum(flow_bp.values()) / len(flow_bp))

## All cloud scheduling

In [None]:
sc.topo.clear_occupied()
all_cloud_scheduler = sch.RandomScheduler(sc)
all_cloud_scheduler.logger.setLevel(logging.INFO)
all_cloud_calculator = sch.LatencyCalculator(sc.topo)
all_cloud_calculator.logger.setLevel(logging.INFO)
all_cloud_result_list = []
s_graph_list = []
t_graph_list = []
for g in graph_list:
    s_cut = set([v.uuid for v in g.get_sources()])
    t_cut = set([v.uuid for v in g.get_sinks()]).union(set([v.uuid for v in g.get_operators()]))
    s_graph_list.append(g.sub_graph(s_cut, gen_uuid()))
    t_graph_list.append(g.sub_graph(t_cut, gen_uuid()))
s_result_list = all_cloud_scheduler.schedule_multiple(s_graph_list, sc.get_edge_domains()[0].topo)
t_result_list = all_cloud_scheduler.schedule_multiple(t_graph_list, sc.get_cloud_domains()[0].topo)
for g, s_result, t_result in zip(graph_list, s_result_list, t_result_list):
    if s_result.status == sch.SchedulingResultStatus.FAILED:
        print("s_graph {} failed: {}".format(g.uuid, s_result.reason))
        continue
    if t_result.status == sch.SchedulingResultStatus.FAILED:
        print("t_graph {} failed: {}".format(g.uuid, t_result.reason))
        continue
    result = sch.SchedulingResult.merge(s_result, t_result)
    all_cloud_calculator.add_scheduled_graph(g, result)
all_cloud_latency, all_cloud_bp = all_cloud_calculator.compute_latency()
print(all_cloud_latency)
print(all_cloud_bp)
print(sum(all_cloud_latency.values()))
print(sum(all_cloud_latency.values()) / len(all_cloud_latency))
print(sum(all_cloud_bp.values()) / len(all_cloud_bp))

## Global random scheduling

In [None]:
sc.topo.clear_occupied()
random_scheduler = sch.RandomScheduler(sc)
random_scheduler.logger.setLevel(logging.INFO)
random_calculator = sch.LatencyCalculator(sc.topo)
random_calculator.logger.setLevel(logging.INFO)
random_result_list = []
result_list = random_scheduler.schedule_multiple(graph_list, sc.topo)
for g, result in zip(graph_list, result_list):
    if result.status == sch.SchedulingResultStatus.FAILED:
        print("graph {} failed: {}".format(g.uuid, result.reason))
        continue
    random_calculator.add_scheduled_graph(g, result)
random_latency, random_bp = random_calculator.compute_latency()
print(random_latency)
print(random_bp)
print(sum(random_latency.values()))
print(sum(random_latency.values()) / len(random_latency))
print(sum(random_bp.values()) / len(random_bp))