In [1]:
import numpy as np

In [2]:
class Task():
    def __init__(self, period, execution_time, deadline ):
        self.period = period
        self.execution_time = execution_time
        self.deadline = deadline
        self.priority = None

    def __repr__(self) -> str:
        return "Task: " +  str(self.period) +" " + str(self.execution_time) + " " + str(self.deadline) + " " + str(self.priority)

class TaskSet():

    def __init__(self, n, ut, min_period, max_period):
        self.n = n
        self.ut = ut
        self.min_period = min_period
        self.max_period = max_period
        self.tasks = None

    def generate(self):
        self.utilities = self.uunifastdiscard()
        if self.utilities is None:
            print("Failed to generate taskset with given parameters")
            return False

        self.periods = self.periods_generation()
        self.execution_time = self.execution_times()
        self.deadline = self.task_deadlines()

        self.tasks = []
        for i in range(self.n):
            self.tasks.append(Task(self.periods[i],self.execution_time[i],self.deadline[i]))
        return True
        

    def uunifastdiscard(self,discard_limit = 1000):
        sumu = self.ut
        for j in range(discard_limit):
            utilities = []
            for i in range(self.n-1):
                nextsumu = sumu*(np.random.random()**(1/(self.n-i+1)))
                utilities.append(sumu - nextsumu)
                sumu = nextsumu

            utilities.append(sumu)

            if all(ut <= 1 for ut in utilities):
                return np.array(utilities)
            else:
                sumu = self.ut

        return None
    
    def periods_generation(self):
        periods = np.ceil(np.exp(np.random.uniform(low=np.log(self.min_period), high=np.log(self.max_period),size=(self.n))))
        return periods
    
    def execution_times(self):
        return np.ceil(self.utilities * self.periods)
    
    def task_deadlines(self):
        deadline = []
        for i in range(len(self.execution_time)):
            deadline.append(np.random.uniform(low = self.execution_time[i] , high = self.periods[i]))

        return np.floor(np.array(deadline))

            


In [3]:
cores = 2
taskset = TaskSet(cores*5,1,10,100)

taskset.generate()
print(taskset.tasks)

[Task: 11.0 4.0 4.0 None, Task: 11.0 1.0 10.0 None, Task: 39.0 1.0 16.0 None, Task: 25.0 1.0 4.0 None, Task: 11.0 1.0 2.0 None, Task: 40.0 6.0 18.0 None, Task: 49.0 3.0 10.0 None, Task: 87.0 1.0 40.0 None, Task: 74.0 4.0 30.0 None, Task: 70.0 27.0 36.0 None]


In [4]:
def DMPO(set):
    set.sort(key = lambda x:x.deadline)

In [5]:
def D_CMPO(taskset):
    taskset.sort(key = lambda x:x.deadline - x.execution_time)

In [6]:
def DkC(taskset,m):
    k = ((m - 1) + np.sqrt(5*m**2 - 6*m + 1))/(2*m)
    taskset.sort(key = lambda x:x.deadline - k*x.execution_time)    

In [7]:
DMPO(taskset.tasks)
print(taskset.tasks)

[Task: 11.0 1.0 2.0 None, Task: 11.0 4.0 4.0 None, Task: 25.0 1.0 4.0 None, Task: 11.0 1.0 10.0 None, Task: 49.0 3.0 10.0 None, Task: 39.0 1.0 16.0 None, Task: 40.0 6.0 18.0 None, Task: 74.0 4.0 30.0 None, Task: 70.0 27.0 36.0 None, Task: 87.0 1.0 40.0 None]


In [8]:
def OPA(taskset, cores, S_test):
    N = len(taskset)

    assigned_tasks = []

    taskset_copy = taskset.copy()

    priority_assigned = [False] * N

    for i in range(N-1,-1,-1):

        for j,task in enumerate(taskset_copy):

            if task.priority == None:
                if(S_test(assigned_tasks + [task],cores)):
                    task.priority = i
                    priority_assigned[j] = True
                    assigned_tasks.append(task)
                    break
        else:
            return taskset_copy,i
    
    return taskset_copy,i

In [9]:
def deadline_analysis(ptaskset,m):    #taskset = {period,execution time,deadline}
    num_tasks = len(ptaskset)
    for i in range(num_tasks):
        total_workload = 0
        ei,di = ptaskset[i][1],ptaskset[i][2]
        l = di
        for j in range(i):
            pj,ej,dj = ptaskset[j][0],ptaskset[j][1],ptaskset[j][2]
            max_jobs = np.floor(( l + dj - ej )/pj)
            workload_j = max_jobs*ej + min(ej,l + dj - ej - max_jobs*pj)
            workload_j = min(workload_j,di - ei + 1)
            total_workload += workload_j

        thresh = ei + np.floor(total_workload/m)
        if di < thresh:
            return 0
    return 1

In [10]:
def deadline_analysis_OPA(ptaskset,m):    #taskset = {period,execution time,deadline}
    Ck = ptaskset[-1].execution_time
    Dk = ptaskset[-1].deadline
    l = Dk
    total_workload = 0

    for task in ptaskset[:-1]:
        Ti,Ci,Di = task.period,task.execution_time,task.deadline
        max_jobs = np.floor(( l + Di - Ci )/Ti)
        workload_task = max_jobs*Ci + min(Ci,l + Di - Ci - max_jobs*Ti)
        workload_task = min(workload_task,Dk - Ck + 1)
        total_workload += workload_task

        thresh = Ck + np.floor(total_workload/m)
        if Dk < thresh:
            return 0
    return 1

In [16]:
cores = 2
set = TaskSet(cores*5,0.2,10,100)

set.generate()
print(set.tasks)

[Task: 17.0 1.0 6.0 None, Task: 83.0 1.0 65.0 None, Task: 18.0 1.0 12.0 None, Task: 24.0 1.0 11.0 None, Task: 85.0 1.0 11.0 None, Task: 81.0 2.0 3.0 None, Task: 63.0 1.0 45.0 None, Task: 66.0 1.0 49.0 None, Task: 11.0 1.0 8.0 None, Task: 25.0 3.0 8.0 None]


In [17]:
ordered_tasks,i = OPA(set.tasks, cores, deadline_analysis_OPA)
print(ordered_tasks)
print(i)

[Task: 17.0 1.0 6.0 9, Task: 83.0 1.0 65.0 8, Task: 18.0 1.0 12.0 7, Task: 24.0 1.0 11.0 6, Task: 85.0 1.0 11.0 5, Task: 81.0 2.0 3.0 None, Task: 63.0 1.0 45.0 4, Task: 66.0 1.0 49.0 3, Task: 11.0 1.0 8.0 2, Task: 25.0 3.0 8.0 1]
0
