In [1]:
import sys
sys.path.insert(0, "../")

In [2]:
import graph
import importlib
import math
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import random
import schedule as sch
import time
import topo
import typing
import utils
import vivaldi
import yaml

In [3]:
topo_file = "../samples/1e3h.yaml"
sc = topo.Scenario.from_dict(yaml.load(open(topo_file, "r").read(), Loader=yaml.Loader))

In [4]:
Coord3D = vivaldi.create_coordinate_class(3)
coords = dict()
for n in sc.topo.get_hosts():
    coords[n.uuid] = Coord3D.random_unit_vector()
coords_result = vivaldi.vivaldi_compute(sc.topo, coords, 0.1, 3000)

In [5]:
for k, c in coords_result.items():
    print(k, c)
rasp1 = coords_result['rasp1']
rasp2 = coords_result['rasp2']
rasp3 = coords_result['rasp3']
# vm1 = coords_result['vm1']
# vm2 = coords_result['vm2']
# print(abs(rasp1-rasp2))
# print(abs(rasp1-rasp3))
# print(abs(rasp1-vm1))
# print(abs(vm1-vm2))

cloud1 (-5.581204050574298,9.77809723724813,-15.266563275164362)
rasp1 (2.5552867989637984,-3.300197747870691,4.287685067444123)
rasp2 (0.8075978638658398,-2.1412931942146476,5.663078409884061)
rasp3 (2.3350842202061894,-1.9886603602932715,5.24852418984888)


In [31]:
with open("../cases/chain1.yaml") as f:
    graph_list: typing.List[graph.ExecutionGraph] = graph.ExecutionGraph.load_all(f)
print(len(graph_list))

10


In [7]:
def get_graph_domain(g: graph.ExecutionGraph) -> topo.Domain:
    domain_set = set()
    for s in g.get_sources():
        for d in sc.get_edge_domains():
            if d.find_host(s.domain_constraint["host"]) is not None:
                domain_set.add(d.name)
    assert len(domain_set) == 1
    return sc.find_domain(list(domain_set)[0])

In [40]:
sc.topo.clear_occupied()
result_list = [sch.SchedulingResult() for _ in graph_list]
op_pick_list = dict()
for g, r in zip(graph_list, result_list):
    op_coords = {v.uuid: Coord3D.random_unit_vector() for v in g.get_vertices()}
    movable = dict()
    for v in g.get_vertices():
        movable[v.uuid] = len(v.domain_constraint) == 0
        if not movable[v.uuid]:
            op_coords[v.uuid] = coords_result[v.domain_constraint['host']]
    op_result = vivaldi.constrained_balance(g, op_coords, movable, 0.001, 2000)
    edge_domain = get_graph_domain(g)
    cloud_domain = sc.get_cloud_domains()[0]
    for v in g.get_sources():
        host = edge_domain.find_host(v.domain_constraint['host'])
        assert host is not None
        r.assign(host.node.uuid, v.uuid)
        host.node.occupy(1)
    for v in g.get_sinks():
        host = cloud_domain.find_host(v.domain_constraint['host'])
        assert host is not None
        r.assign(host.node.uuid, v.uuid)
        host.node.occupy(1)
    for v in g.get_operators():
        op_pick_list[v.uuid] = op_result[v.uuid]

In [41]:
print(sum([g.number_of_vertices() for g in graph_list]))
print(len(op_pick_list))

66
46


In [42]:
class PickItem:
    def __init__(self, coord):
        self.coord = coord
        self.min_dist = None

op_pick_list = {k: PickItem(v) for k, v in op_pick_list.items()}

In [43]:
big_result = sch.SchedulingResult()

while True:
    # do min_dist marking
    for domain in sc.get_edge_domains() + sc.get_cloud_domains():
        for host in domain.topo.get_hosts():
            if host.slots <= host.occupied or len(op_pick_list) == 0:
                continue
            self_coord = coords_result[host.uuid]
            for op_id, op_item in op_pick_list.items():
                dist = abs(op_item.coord - self_coord)
                if op_item.min_dist is None or dist < op_item.min_dist:
                    op_item.min_dist = dist
    while True:
        updated = False
        for domain in sc.get_edge_domains() + sc.get_cloud_domains():
            for host in domain.topo.get_hosts():
                if host.slots <= host.occupied or len(op_pick_list) == 0:
                    continue
                self_coord = coords_result[host.uuid]
                min_dist = 1e10
                min_op = None
                for op_id, op_item in op_pick_list.items():
                    dist = abs(op_item.coord - self_coord)
                    if dist < min_dist:
                        min_dist = dist
                        min_op = op_id
                    # if op_item.min_dist is None or dist < op_item.min_dist:
                    #     op_item.min_dist = dist
                assert min_dist < 1e10 and min_op is not None
                if min_dist <= op_pick_list[op_id].min_dist:
                    big_result.assign(host.uuid, min_op)
                    host.occupy(1)
                    op_pick_list.pop(min_op)
                    updated = True
        if not updated:
            break
    if len(op_pick_list) == 0:
        break
    for _, op_item in op_pick_list.items():
        op_item.min_dist = None


In [44]:
for idx, g in enumerate(graph_list):
    result_list[idx] = sch.SchedulingResult.merge(result_list[idx], big_result.extract(set([v.uuid for v in g.get_operators()])))

In [45]:
for r in result_list:
    print(r.assign_map)

{'5e46068a-v1': 'rasp1', '5e46068a-v7': 'cloud1', '5e46068a-v5': 'rasp3', '5e46068a-v4': 'rasp2', '5e46068a-v2': 'rasp1', '5e46068a-v6': 'rasp3', '5e46068a-v3': 'rasp1'}
{'cf3bfe16-v1': 'rasp2', 'cf3bfe16-v6': 'cloud1', 'cf3bfe16-v5': 'cloud1', 'cf3bfe16-v2': 'rasp2', 'cf3bfe16-v3': 'rasp3', 'cf3bfe16-v4': 'cloud1'}
{'55ad6790-v1': 'rasp2', '55ad6790-v11': 'cloud1', '55ad6790-v4': 'cloud1', '55ad6790-v5': 'cloud1', '55ad6790-v2': 'cloud1', '55ad6790-v7': 'cloud1', '55ad6790-v8': 'cloud1', '55ad6790-v9': 'cloud1', '55ad6790-v10': 'cloud1', '55ad6790-v3': 'cloud1', '55ad6790-v6': 'cloud1'}
{'a8d429d7-v1': 'rasp2', 'a8d429d7-v3': 'cloud1', 'a8d429d7-v2': 'cloud1'}
{'d5f91d85-v1': 'rasp3', 'd5f91d85-v5': 'cloud1', 'd5f91d85-v3': 'cloud1', 'd5f91d85-v2': 'cloud1', 'd5f91d85-v4': 'cloud1'}
{'46f682fb-v1': 'rasp1', '46f682fb-v3': 'cloud1', '46f682fb-v2': 'cloud1'}
{'65a7af2f-v1': 'rasp1', '65a7af2f-v6': 'cloud1', '65a7af2f-v3': 'cloud1', '65a7af2f-v5': 'cloud1', '65a7af2f-v2': 'rasp1', '65a7a

In [46]:
calculator = sch.LatencyCalculator(sc.topo)
for g, r in zip(graph_list, result_list):
    calculator.add_scheduled_graph(g, r)
lat, bp = calculator.compute_latency()
print(lat)
print(bp)

{'5e46068a': 105.0, 'cf3bfe16': 42.0, '55ad6790': 29.0, 'a8d429d7': 33.0, 'd5f91d85': 33.0, '46f682fb': 35.0, '65a7af2f': 32.0, 'c010fca7': 73.0, '15cc54ba': 35.0, 'd3e4e108': 64.0}
{'5e46068a': 0.3310841520915344, 'cf3bfe16': 0.2726026620550839, '55ad6790': 0.05961227786752828, 'a8d429d7': 0.3447204968944099, 'd5f91d85': 0.053304484657749805, '46f682fb': 0.18789013732833956, '65a7af2f': 0.014471243042671614, 'c010fca7': 0.1680327924235957, '15cc54ba': 0.0, 'd3e4e108': 0.26107910906298004}


In [47]:
g = graph_list[1]
op_coords = {v.uuid: Coord3D.random_unit_vector() for v in g.get_vertices()}
movable = dict()
for v in g.get_vertices():
    movable[v.uuid] = len(v.domain_constraint) == 0
    if not movable[v.uuid]:
        op_coords[v.uuid] = coords_result[v.domain_constraint['host']]
op_result = vivaldi.constrained_balance(g, op_coords, movable, 0.001, 2000)
for v in g.get_vertices():
    print(v.uuid, op_result[v.uuid])
    for host in sc.topo.get_hosts():
        print(host.uuid, "{:.2f}".format(abs(op_result[v.uuid] - coords_result[host.uuid])), end=",")
    print("")

cf3bfe16-v1 (0.8075978638658398,-2.1412931942146476,5.663078409884061)
cloud1 24.92,rasp1 2.51,rasp2 0.00,rasp3 1.59,
cf3bfe16-v2 (0.22219886281031212,-1.046819891687086,3.735245972063275)
cloud1 22.63,rasp1 3.29,rasp2 2.29,rasp3 2.76,
cf3bfe16-v3 (0.01001853913659417,-0.6504087445001521,3.0377376212322673)
cloud1 21.80,rasp1 3.88,rasp2 3.12,rasp3 3.48,
cf3bfe16-v4 (-1.6131484429814233,2.3799658313218406,-2.2888072859210276)
cloud1 15.46,rasp1 9.64,rasp2 9.46,rasp3 9.56,
cf3bfe16-v5 (-2.190397287669846,3.4568802435832207,-4.17968959591668)
cloud1 13.21,rasp1 11.83,rasp2 11.71,rasp3 11.79,
cf3bfe16-v6 (-5.581204050574298,9.77809723724813,-15.266563275164362)
cloud1 0.00,rasp1 24.89,rasp2 24.92,rasp3 24.94,


In [49]:
print(abs(op_result['cf3bfe16-v1']-op_result['cf3bfe16-v2']))
print(abs(op_result['cf3bfe16-v2']-op_result['cf3bfe16-v3']))
print(abs(op_result['cf3bfe16-v3']-op_result['cf3bfe16-v4']))
print(abs(op_result['cf3bfe16-v4']-op_result['cf3bfe16-v5']))
print(abs(op_result['cf3bfe16-v5']-op_result['cf3bfe16-v6']))

2.2928370436418795
0.8298675718735712
6.339552174211294
2.251310016475938
13.205079430429473


In [48]:
for u, v, e in g.get_edges():
    print("{} -- {} --> {}".format(u, int(e['unit_size']*e['per_second']/1e3), v))

cf3bfe16-v1 -- 591 --> cf3bfe16-v2
cf3bfe16-v2 -- 1635 --> cf3bfe16-v3
cf3bfe16-v3 -- 214 --> cf3bfe16-v4
cf3bfe16-v4 -- 604 --> cf3bfe16-v5
cf3bfe16-v5 -- 103 --> cf3bfe16-v6


In [50]:
sc.topo.clear_occupied()
flow_calculator = sch.LatencyCalculator(sc.topo)
flow_scheduler = sch.FlowScheduler(sc)
flow_result_list = flow_scheduler.schedule_multiple(graph_list)
for g, r in zip(graph_list, flow_result_list):
    assert r is not None
    flow_calculator.add_scheduled_graph(g, r)
lat, bp = flow_calculator.compute_latency()
print(lat)
print(bp)
for r in flow_result_list:
    print(r.assign_map)

{'5e46068a': 35.0, 'cf3bfe16': 33.0, '55ad6790': 28.0, 'a8d429d7': 28.0, 'd5f91d85': 32.0, '46f682fb': 28.0, '65a7af2f': 41.0, 'c010fca7': 30.0, '15cc54ba': 43.0, 'd3e4e108': 43.0}
{'5e46068a': 0.0, 'cf3bfe16': 0.0, '55ad6790': 0.0, 'a8d429d7': 0.0, 'd5f91d85': 0.0, '46f682fb': 0.0, '65a7af2f': 0.0, 'c010fca7': 0.0, '15cc54ba': 0.0, 'd3e4e108': 0.0}
{'5e46068a-v1': 'rasp1', '5e46068a-v2': 'rasp1', '5e46068a-v5': 'cloud1', '5e46068a-v4': 'cloud1', '5e46068a-v7': 'cloud1', '5e46068a-v6': 'cloud1', '5e46068a-v3': 'cloud1'}
{'cf3bfe16-v1': 'rasp2', 'cf3bfe16-v5': 'cloud1', 'cf3bfe16-v3': 'cloud1', 'cf3bfe16-v6': 'cloud1', 'cf3bfe16-v2': 'cloud1', 'cf3bfe16-v4': 'cloud1'}
{'55ad6790-v1': 'rasp2', '55ad6790-v4': 'cloud1', '55ad6790-v5': 'cloud1', '55ad6790-v2': 'cloud1', '55ad6790-v7': 'cloud1', '55ad6790-v8': 'cloud1', '55ad6790-v11': 'cloud1', '55ad6790-v9': 'cloud1', '55ad6790-v10': 'cloud1', '55ad6790-v3': 'cloud1', '55ad6790-v6': 'cloud1'}
{'a8d429d7-v1': 'rasp2', 'a8d429d7-v2': 'rasp2'

In [19]:
def unit_size_cb(r: int):
    return 10000 * math.pow(10, random.randint(0, 1))

def gen_graphs(graph_count, source_selector_dict):
    source_selector = graph.SourceSelector(source_selector_dict)
    gen_args_list = [
            {
            "total_rank": random.randint(3, 7),
            "max_node_per_rank": random.randint(1, 3),
            "max_predecessors": random.randint(1, 2),
            "mi_cb": lambda: 1,
            "memory_cb": lambda: int(2e8),
            "unit_size_cb": unit_size_cb,
            "unit_rate_cb": lambda: random.randint(10, 20),
            "source_hosts": source_selector,
            "sink_hosts": ["cloud1"],
        }
        for _ in range(graph_count)
    ]
    return [
        graph.GraphGenerator("g" + str(idx), **gen_args).gen_dag_graph()
        for idx, gen_args in enumerate(gen_args_list)
    ]

In [20]:
# source_dict = {'rasp1': 8, 'rasp2': 8, 'rasp3': 8}
source_dict = {'rasp'+str(i): 8 for i in range(1, 7)}
source_dict.update({'vm'+str(i): 16 for i in range(1, 7)})
graph_list = gen_graphs(40, source_dict)
with open("../cases/dag3.yaml", "w") as f:
    graph.ExecutionGraph.save_all(graph_list, f)