In [394]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import copy
from multiprocess import Process, Manager
import time

In [395]:
macrotick = 100
sync_error = 0
time_out = 4 * 60 * 60

NUM_FLOW = 100000
DATA_NAME = "harmonic0"
TOPO_NAME = "2"

task = pd.read_csv("../../data/utilization/utilization_10_10.csv")
network = pd.read_csv("../../data/utilization/utilization_topology.csv")

# task = pd.read_csv("../../dac_data/%s.csv"%DATA_NAME)[:NUM_FLOW]
# network = pd.read_csv("../../dac_data/%s_topology.csv"%TOPO_NAME)
for col in ['size','period','deadline','jitter']:
    task[col] = np.ceil(task[col] / macrotick).astype(int)
for col in ['t_proc','t_prop']:
    network[col] = np.ceil(network[col] / macrotick).astype(int)
    
nodes = list(network['link'].apply(lambda x:eval(x)[0])) + \
    list(network['link'].apply(lambda x:eval(x)[1]))
NODE_SET = list(set(nodes))
ES_set = [x for x in NODE_SET if nodes.count(x) == 2]
SW_set = list(set(NODE_SET) - set(ES_set))
LCM = np.lcm.reduce(task['period'])
net = np.zeros(shape = (max(NODE_SET) + 1, max(NODE_SET) + 1))

## 1. Model

In [396]:
net_var = {}

for _, row in network.iterrows():
    net_var.setdefault(eval(row['link'])[0], {})
    net_var[eval(row['link'])[0]]['msd'] = row['t_proc']
    net[eval(row['link'])[0], eval(row['link'])[1]] = 1

## Create mapping from Link to index
link_to_index = {}
index_to_link = {}

counter = 0
for _, row in network.iterrows():
    link = row['link']
    link_to_index[link] = counter
    index_to_link[counter] = link
    counter += 1

In [397]:
## Shortest path
def bfs_paths(graph, start, goal):
    queue = [(start, [start])]
    while queue:
        (vertex, path) = queue.pop(0)
        for _next in set(np.reshape(np.argwhere(graph[vertex] > 0),  -1)) - set(path):
            if _next == goal:
                yield path + [_next]
            else:
                queue.append((_next, path + [_next]))

In [398]:
task_attr = {}
task_var = {}
link_to_flow = {}
next_link = {} ## {flow: {link: [next_link]}}
pre_link = {} ## {flow: {link: [pre_link]}}
for k, row in task.iterrows():
    task_attr.setdefault(k, {})
    task_var.setdefault(k, {})
    
    ## f_k
    task_attr[k]['s'] = int(row['src'])
    task_attr[k]['d'] = int(eval(row['dst'])[0])
    task_attr[k]['ct'] = int(row['period'])
    task_attr[k]['rsl'] = int(row['size'] * 8)
    task_attr[k]['ml'] = int(row['deadline'])
    task_attr[k]['q'] = 0
    
    task_attr[k]['route'] = next(bfs_paths(net, int(row['src']), int(eval(row['dst'])[0])))
    
    count = 0
    next_link.setdefault(k, {})
    pre_link.setdefault(k, {})
    for a, b in zip(task_attr[k]['route'][:-1], task_attr[k]['route'][1:]):
        task_var[k][str((a, b))] = [-1] * 3 ## [weight, queue, offset]
        task_var[k][str((a, b))][0] = len(task_attr[k]['route']) * task_attr[k]['rsl'] / task_attr[k]['ct']
        link_to_flow.setdefault(str((a, b)), [])
        link_to_flow[str((a, b))].append(k)
        if count < len(task_attr[k]['route']) - 2:
            next_link[k][str((a, b))] = str((task_attr[k]['route'][count+1], task_attr[k]['route'][count + 2]))
        else:
            next_link[k][str((a, b))] = None
        if count > 0:
            pre_link[k][str((a, b))] = str((task_attr[k]['route'][count-1], task_attr[k]['route'][count]))
        else:
            pre_link[k][str((a, b))] = None
        count += 1
       
        
        
        

In [399]:
route_to_index = {}
index_to_route = {}
for k in task_attr:
    route_to_index.setdefault(k, {})
    index_to_route.setdefault(k, {})
    for i, v in enumerate(task_attr[k]['route'][:-1]):
        route_to_index[k][str((v, task_attr[k]['route'][i+1]))] = i
        index_to_route[k][i] = str((v, task_attr[k]['route'][i+1]))

## 2. DivPhases

In [400]:
def isAllLinkPhased(link_set):
    for link in link_set:
        if not link[1]:
            return False
    return True

From the paper:

The only condition for a link l i to be assigned to a phase l i .ϕ is that all frames transmitted through that link f ∈ F TT : f .s j .ζ = l i must have all previous links (links closer to destination) assigned to previous phases ∀f .s k .ζ : k < j|f .s k .ζ .ϕ < f .s j .ζ .ϕ.

But from Order1 and Order2, it seems two links can also have dependency even neither one is not the other's previous link?

In [401]:
link_dependency = {} ## {link, [depdent links...]}

for i in task_attr:
    for hop, link in enumerate(task_var[i].keys()):
        link_dependency.setdefault(link, set())
        if hop != len(task_var[i].keys()) - 1:
            link_dependency[link].add(list(task_var[i].keys())[hop+1])
            
    # for j in task_attr:
    #     if i < j:
    #         for hop_i, link_i in enumerate(task_var[i].keys()):
    #             for hop_j, link_j in enumerate(task_var[j].keys()):
    #                 if link_i == link_j:
    #                     link_ib = list(task_var[i].keys())[hop_i-1]
    #                     link_jb = list(task_var[j].keys())[hop_j-1]
    #                     link_dependency.setdefault(link_jb, set())
    #                     link_dependency[link_jb].add(link_ib)

In [402]:
from collections import defaultdict

In [403]:
def toposort(data):
    data = {k: set(v) for k, v in data.items()}
    graph = defaultdict(set)
    nodes = set()
    for k, v in data.items():
        graph[k] = v
        nodes.add(k)
        nodes.update(v)

    result = []
    while nodes:
        no_dep = set(n for n in nodes if not graph[n])
        if not no_dep:
            raise Exception('Cyclic dependencies exist among these items: {}'.format(', '.join(nodes)))
        nodes.difference_update(no_dep)
        result.append(no_dep)

        for node, edges in graph.items():
            edges.difference_update(no_dep)

    return result

In [404]:
result = toposort(link_dependency)
for i, nodes in enumerate(result, start=1):
    print(f"Phase {i}: {nodes}")

Phase 1: {'(7, 15)', '(3, 11)', '(1, 9)', '(4, 12)', '(6, 14)', '(5, 13)', '(0, 8)', '(2, 10)'}
Phase 2: {'(6, 7)', '(1, 6)', '(2, 5)', '(3, 4)', '(0, 7)', '(5, 4)'}
Phase 3: {'(6, 5)', '(2, 3)', '(1, 0)', '(5, 6)'}
Phase 4: {'(7, 6)', '(2, 1)', '(4, 5)', '(1, 2)'}
Phase 5: {'(10, 2)', '(0, 1)', '(9, 1)', '(6, 1)', '(3, 2)', '(5, 2)'}
Phase 6: {'(13, 5)', '(11, 3)', '(8, 0)', '(14, 6)', '(4, 3)', '(7, 0)'}
Phase 7: {'(12, 4)', '(15, 7)'}


## 3. Scheduling

In [405]:
def collision(link, flow, offset, scheduled_frame):
    global task_attr
    w_i = task_attr[flow]['rsl']
    p_i = task_attr[flow]['ct']
    o_i = offset
    same_link_frames = scheduled_frame[link]
    for flow_j, sche in same_link_frames.items():
        w_j = task_attr[flow_j]['rsl']
        p_j = task_attr[flow_j]['ct']
        o_j = sche[1]
        lcm = np.lcm(p_i, p_j)
        for u, v in [(u, v) 
                     for u in range(0, int(lcm / p_i))
                     for v in range(0, int(lcm / p_j))]:
            if (o_j + v * p_j <= o_i + u * p_i + w_i ) and (o_i + u * p_i <= o_j + v * p_j + w_j):
                return True
    return False
        

In [406]:
def get_potential_order_violate_set(link, flow, offset, scheduled_frame) -> list:
    global task_attr, next_link, pre_link
    
    violate_set = []
    q_i = task_attr[flow]['q']
    o_i = offset

    ## Next link must be already scheduled
    next_link_i = next_link[flow][link]
    if next_link_i == None:
        return violate_set
    
    on_i = scheduled_frame[next_link_i][flow][1]
    
    ## Find other flows scheduled on the next link
    for next_link_j in scheduled_frame:
        if next_link_i == next_link_j and link in scheduled_frame:
            for flow_j in scheduled_frame[next_link_j]:
                if flow_j != flow:
                    q_j = task_attr[flow_j]['q']
                    if q_j == q_i:
                        link_j = pre_link[flow_j][next_link_j]
                        if link_j == None:
                            continue
                        if link_j not in scheduled_frame:
                            continue
                        if flow_j not in scheduled_frame[link_j]:
                            continue
                        o_j = scheduled_frame[link_j][flow_j][1]
                        on_j = scheduled_frame[next_link_j][flow_j][1]
                        
                        violate_set.append([
                            flow,
                            flow_j,
                            o_i, 
                            on_i,
                            o_j,
                            on_j
                        ])
    return violate_set

In [407]:
def order1(i, j, o_i, on_i, o_j, on_j):
    global task_attr
    p_i, p_j = task_attr[i]['ct'], task_attr[j]['ct']
    lcm = np.lcm(p_i, p_j)
    for u, v in [(u, v) 
                     for u in range(0, int(lcm / p_i))
                     for v in range(0, int(lcm / p_j))]:
        if (o_j + v * p_j) < (o_i + u * p_i) and (on_j + v * p_j) > (on_i + u * p_i):
            return True
    return False

def order2(i, j,  o_i, on_i, o_j, on_j):
    global task_attr
    p_i, p_j = task_attr[i]['ct'], task_attr[j]['ct']
    lcm = np.lcm(p_i, p_j)
    for u, v in [(u, v) 
                     for u in range(0, int(lcm / p_i))
                     for v in range(0, int(lcm / p_j))]:
        if (o_j + v * p_j) > (o_i + u * p_i) and (on_j + v * p_j) < (on_i + u * p_i):
            return True
    return False
    

In [408]:
def schedule_link(link, scheduled_frame):
    scheduled_frame = copy.deepcopy(scheduled_frame)
    flag = 3
    for i in link_to_flow[link]:
        next_link_i = next_link[i][link]
        if next_link_i == None:
            offset = task_attr[i]['ml']
        else:
            offset = min(scheduled_frame[next_link_i][i][1] - task_attr[i]['rsl'] - net_var[eval(link)[0]]['msd'], task_attr[i]['ml'])
        
        while flag:
            if link in scheduled_frame and collision(link, i, offset, scheduled_frame):
                offset -= 1
                flag = 3 ## offset - 1
                continue
            
            collision_set = get_potential_order_violate_set(link, i, offset, scheduled_frame)
            for flow, flow_j, offset, on_i, o_j,on_j in collision_set:
                if order1(flow, flow_j, offset,  on_i, o_j, on_j):
                    if task_attr[i]['q'] == 8:
                        flag = 1
                        break
                    else:
                        flag = 2 ## queue + 1
                        break
                if order2(flow, flow_j, offset,  on_i, o_j, on_j):
                    if task_attr[i]['q'] == 8:
                        flag = 0 ## failed
                        break
                    else:
                        flag = 2 ## queue + 1
                        break
            
            if offset < 0:
                flag = 0 ## failed
            
            if flag == 0:
                print("Failed")
                break
            elif flag == 1:
                offset -= 1
                flag = 3
                continue
            elif flag == 2:
                task_attr[i]['q'] += 1
                flag = 3
                continue
            elif flag == 3:
                scheduled_frame.setdefault(link, {})
                scheduled_frame[link][i] = [task_attr[i]['q'], offset]
                # print(f"{i}-{link} {[task_attr[i]['q'], offset]}")
                break
    return scheduled_frame, flag

In [409]:
def merge_dict(dict1, dict2):
    dict1 = dict1.copy()
    for key, value in dict2.items():
        if key in dict1:
            dict1[key].update(value)
        else:
            dict1[key] = value
    return dict1

In [410]:
# # Search should starts from the maximum offset
# # Manually create 4 processes for seaching

# ## scheduled_frame = {link: {flow: [queue, offset]}}

# scheduled_frame = {}


# for phase in range(len(result)):
#     ## [TODO] Make this parallel
#     print("Phase: ", phase)
#     phase_result_list = []
#     ## Create 4 processes for searching
#     for link in result[phase]:
#         schedule, flag = schedule_link(link, scheduled_frame)
#         if flag == 0:
#             print("Failed")
#         phase_result_list.append(schedule)
#     for schedule in phase_result_list:
#         scheduled_frame = merge_dict(scheduled_frame, schedule)
                    

In [411]:
scheduled_frame = {}

def schedule_and_update(link, scheduled_frame, result_dict):
    schedule, flag = schedule_link(link, scheduled_frame)
    if flag == 0:
        print("Failed")
    else:
        result_dict[link] = schedule


if __name__ == '__main__':
    for phase in range(len(result)):
        print("Phase: ", phase)
        
        with Manager() as manager:
            result_dict = manager.dict()
            processes = []

            for link in result[phase]:
                # If 4 processes are already running, wait until one has finished
                while len(processes) >= 4:
                    for p in processes:
                        if not p.is_alive():
                            p.join()
                            processes.remove(p)
                    time.sleep(0.1)  # Optional short sleep to prevent excessive CPU usage

                # Start a new process
                p = Process(target=schedule_and_update, args=(link, scheduled_frame, result_dict))
                p.start()
                processes.append(p)

            # Ensure all processes have finished execution
            for p in processes:
                p.join()

            # Update the main scheduled_frame dict
            for link, schedule in result_dict.items():
                scheduled_frame = merge_dict(scheduled_frame, schedule)

Phase:  0
Phase:  1
Phase:  2
Phase:  3
Phase:  4
Phase:  5
Phase:  6


In [392]:
count = 0
for link, flows in scheduled_frame.items():
    for flow in flows:
        count += 1
count

317

In [393]:
count = 0
for flow in task_attr:
    for link in task_attr[flow]['route']:
        count += 1
    count -= 1
print(count)

317
