In [1]:
import numpy as np
import collections
import random
import heapq

np.random.seed(seed=123)

In [2]:
class subtask:
    def __init__(self, comp_demand, link_demand, order, proc_node):
        self.comp_demand = comp_demand
        self.link_demand = link_demand
        self.ori_link_demand = link_demand
        self.order = order
        self.proc_node = proc_node
        
        
    def recover_link_demand(self):
        self.link_demand = ori_link_demand
        
    def __str__(self):
        return f"({self.comp_demand}, {self.link_demand})"

In [3]:
class job:
    def __init__(self, source, destination, model, birth):
        self.source = source
        self.destination = destination
        self.model = model
        self.subtasks = []
        self.offloading = []
        self.routing = []
        self.birth = birth
        self.index = -1
    
    def add_subtasks(self):
        for order in range(self.model+2):
            self.subtasks.append(subtask(10, 3, order, self.offloading[order]))
            
    def assign_index(self, index):
        self.index = index

In [4]:
class job_waiting_queue:
    def __init__(self):
        self.buffer = collections.deque()
            
    def push(self, job):
        self.buffer.append(job)
        
    def pop(self):
        return self.buffer.popleft()
        
    def __len__(self):
        return len(self.buffer)

In [5]:
class backlog:
    def __init__(self):
        self.buffer = collections.deque()
    
    def push(self, job):
        self.buffer.append(job)
    
    def pop(self):
        return self.buffer.popleft()
        
    def __len__(self):
        return len(self.buffer)

In [6]:
class system_manager():
    def __init__(self, network, job_waiting_queue, backlog):
        self.network = network
        self.job_waiting_queue = job_waiting_queue
        self.backlog = backlog
        self.activated_job = [None]*10 # 현재 system 안에서 돌아가고 있는 jobs.
        self.activated_job_num = 0
        self.src_dst_pair = []
        for src in range(5):
            for dst in range(5):
                if src == dst:
                    continue
                self.src_dst_pair.append([src, dst])
    
    
    
    def create_job(self, time):
        job_num = np.random.poisson(lam=1, size=len(self.src_dst_pair))
        for index in range(len(self.src_dst_pair)):
            src, dst = self.src_dst_pair[index]
            for _ in range(job_num[index]):
                model = random.randint(1, 3)
                new_job = job(src, dst, model, time)
                if len(self.job_waiting_queue) < 30:
                    self.job_waiting_queue.push(new_job)
                else:
                    self.backlog.push(new_job)
    
    
    
    def move_job(self):
        while len(self.job_waiting_queue) < 30:
            if len(self.backlog) == 0:
                break
            else:
                job = self.backlog.pop()
                self.job_waiting_queue.push(job)
                
                
                
    def find_index(self):
        for index, room in enumerate(self.activated_job):
            if room == None:
                return index
                
                
                
    def schedule_job(self):
        while self.activated_job_num < 10 and len(self.job_waiting_queue) != 0:
            # 지금 job의 index 계산
            index = self.find_index()
            
            job = self.job_waiting_queue.pop()
            job.assign_index(index)
            self.activated_job[index] = job
            
            src, dst = job.source, job.destination
            
            # offloading scheduling
            for order in range(job.model+2):
                node = random.randint(0, 4)
                job.offloading.append(node)
            job.add_subtasks()
            
            print("offloading)")
            for k in job.offloading:
                print(k, end=' ')
            print()
            
            # transfer output data logic
            if dst != job.offloading[-1]:
                job.offloading.append(dst)
            
            # transfer input data logic
            if src == job.offloading[0]:
                self.network.nodes[src].push(job)
            else:
                job.subtasks.insert(0, subtask(0, 3, -1, -1))
                path = self.network.routing(src, job.offloading[0])
                job.routing = path
                self.network.links[f'{path[0]}{path[1]}'].push(job)
                self.network.links[f'{path[0]}{path[1]}'].waiting += 3
                del job.routing[0]
                
            self.activated_job_num += 1
            break
                
                
    
    def get_state(self):
        node_waiting_vector = [[0]*(5*10) for _ in range(5)]
        node_processing_vector = [[0]*(5*10) for _ in range(5)]
        
        # get node waiting vector
        for job_idx, job in enumerate(self.activated_job):
            if job == None:
                continue
            for subtask in job.subtasks:
                order, proc_node, rem_comp_demand = subtask.order, subtask.proc_node, subtask.comp_demand
                if order >= 0:
                    node_waiting_vector[proc_node][5*job_idx+order] = rem_comp_demand
                    
        # get node processing vector
        for node_idx, node in enumerate(self.network.nodes):
            job_idx, order = node.cur_proc_job, node.cur_proc_subtask
            if job_idx != -1 and order != -1:
                node_processing_vector[node_idx][5*job_idx+order] = 1
        
        print('node waiting vector)')
        for node in range(5):
            cnt = 0
            for s in range(50):
                print(node_waiting_vector[node][s], end='')
                cnt += 1
                if cnt == 5:
                    print(' | ', end=' ')
                    cnt = 0
            print()
        print()
        
        print('node processing vector)')
        for node in range(5):
            cnt = 0
            for s in range(50):
                print(node_processing_vector[node][s], end='')
                cnt += 1
                if cnt == 5:
                    print(' | ', end=' ')
                    cnt = 0
            print()
        print()
        return node_waiting_vector, node_processing_vector
            
        
        
    def step(self, time):
        #print(f'act : {self.activated_job_num}, wait : {len(self.job_waiting_queue)}, blog : {len(self.backlog)}')
        completed_job_index = self.network.step(time)
        for index in completed_job_index:
            self.activated_job[index] = None
            self.activated_job_num -= 1

In [7]:
class node:
    def __init__(self, capacity):
        self.capacity = capacity
        self.waiting_queue = collections.deque()
        self.cur_proc_job = -1
        self.cur_proc_subtask = -1
    
    def push(self, job):
        self.waiting_queue.append(job)
        
    def pop(self):
        return self.waiting_queue.popleft()

    def get_info(self, index):
        if len(self.waiting_queue) == 0:
            return
        print(index)
        for job in self.waiting_queue:
            print(job.subtasks[0], end=' ')
        print()

In [8]:
class link:
    def __init__(self, capacity):
        self.capacity = capacity
        self.waiting_queue = collections.deque()
        self.cur_remain = 0
        self.waiting = 0
        
    def push(self, job):
        self.waiting_queue.append(job)
        
    def pop(self):
        return self.waiting_queue.popleft()
    
    def get_waiting(self):
        return self.waiting
    
    def get_info(self, index):
        if len(self.waiting_queue) == 0:
            return
        else:
            print(index)
            for job in self.waiting_queue:
                print(job.subtasks[0], end=' ')
            print()

In [11]:
class network:
    def __init__(self):
        self.nodes = []
        self.links = {}
        self.adjacent = [[1, 2], [0, 2, 3], [0, 1, 3, 4], [1, 2, 4], [2, 3]]
        for n in range(5):
            self.nodes.append(node(10))
        for src in range(5):
            for dst in self.adjacent[src]:
                self.links[f'{src}{dst}'] = link(10)
                
                
    def step(self, time):
        #print("--------------------------------------------------------------")
        completed_job_index = []
        job_index1 = self.step_link(time)
        job_index2 = self.step_node(time)
        completed_job_index.extend(job_index1)
        completed_job_index.extend(job_index2)
        return completed_job_index
        '''
        print("node)")
        for index, node in enumerate(self.nodes):
            node.get_info(index)
        print("\nlink)")
        for key, link in self.links.items():
            link.get_info(key)
        print('\n')
        '''
            

    
    def step_node(self, time):
        completed_job_index = []
        for node_index, node in enumerate(self.nodes):
            if len(node.waiting_queue) == 0:
                continue
            job = node.waiting_queue[0]
            rem_tasks = job.subtasks
            rem_tasks[0].comp_demand = max(0, rem_tasks[0].comp_demand-(node.capacity*0.1))
            
            # record current processing task
            node.cur_proc_job = job.index
            node.cur_proc_subtask = rem_tasks[0].order
            
            if rem_tasks[0].comp_demand == 0:
                node.cur_proc_job = -1
                node.cur_proc_task = -1
                if len(job.offloading) == 1:
                    #print(f'completion time {(time-job.birth)/10}')
                    completed_job_index.append(job.index)
                    node.pop()
                else:
                    start, end = job.offloading[0], job.offloading[1]
                    if start != end:
                        path = self.routing(start, end)
                        job.routing = path
                        self.links[f'{path[0]}{path[1]}'].push(job)
                        self.links[f'{path[0]}{path[1]}'].waiting += rem_tasks[0].link_demand
                        #print(f"node to flow : {path[0]} {path[1]}")
                        del job.routing[0]
                        node.pop()
                    else:
                        del rem_tasks[0]
                    del job.offloading[0]
                
                    
        return completed_job_index
                    
                    
                    
    def step_link(self, time):
        completed_job_index = []
        for key, link in self.links.items():
            if len(link.waiting_queue) == 0:
                continue
            job = link.waiting_queue[0]
            rem_tasks = job.subtasks
            rem_tasks[0].link_demand = max(0, rem_tasks[0].link_demand-(link.capacity*0.1))
            link.waiting = max(0, link.waiting-(link.capacity*0.1))
            
            if rem_tasks[0].link_demand == 0:
                # 해당 layer의 destination node에 도달했다면
                if len(job.routing) == 1:
                    del rem_tasks[0]
                    if len(job.offloading) != 1:
                        node = job.routing[0]
                        self.nodes[node].push(job)
                    # job completion
                    else:
                        #print(f'completion time {(time-job.birth)/10}')
                        completed_job_index.append(job.index)
                else:
                    rem_tasks[0].link_demand = rem_tasks[0].ori_link_demand # recover data size
                    start, end = job.routing[0], job.routing[1]
                    #print(f"link to flow : {start} {end}")
                    self.links[f'{start}{end}'].push(job)
                    self.links[f'{start}{end}'].waiting += rem_tasks[0].link_demand
                    del job.routing[0]
                    
                link.pop()
        
        return completed_job_index
                
         
        
    def routing(self, start, end):
        graph = [[] for _ in range(5)]
        pre = [0]*5
        distance = [10000]*5
        for src in range(5):
            for dst in self.adjacent[src]:
                cost = self.links[f'{src}{dst}'].get_waiting()/self.links[f'{src}{dst}'].capacity
                graph[src].append((dst, cost))
                
        q = [(0, start)]
        distance[start] = 0
        while q:
            dist, idx = heapq.heappop(q)
            if dist != distance[idx]:
                continue
            for nidx, cost in graph[idx]:
                if distance[nidx] > dist+cost:
                    distance[nidx] = dist+cost
                    pre[nidx] = idx
                    heapq.heappush(q, (distance[nidx], nidx))
                    
        path = []
        cur_node = end
        while cur_node != start:
            path.append(cur_node)
            cur_node = pre[cur_node]
        path.append(start)
        path.reverse()
        return path