In [1]:
import numpy as np

# Rate Monotonic (RM)

- Upper Bound (necessary condition):
    - UB: $U_p = \Sigma_{i}^{n}\frac{C_i}{T_i} \leq 1$

- lower Bound (sufficient conditions):
   - HB: $U_p = \prod_{i}^{n}(U_i+1) \leq 2$
   - LL: $U_p = \Sigma_{i}^{n}U_i \leq n(2^{1/n}-1)$

In [2]:
class RM:
    def __init__(self, T, C):
        assert len(T) == len(C)
        self.T = T 
        self.C = C
        self.n = len(T)
        
    def testUB(self):
        up = np.sum(C/T)
        return up <= 1, up
    
    def testHB(self):
        up = 1
        for i in range(self.n):
            up = up*(C[i]/T[i]+1)
        return up <= 2, up
    
    def testLL(self):
        up = np.sum(C/T)
        return up <= self.n*(2**(1/self.n)-1), self.n*(2**(1/self.n)-1)
    
    def testFeasible(self):
        nec, util = self.testUB()
        suf, cal = self.testHB()
        print("...")
        if(not nec):
            print("It is infeasible to be scheduled by RM")
            print("The utilization is ", util, " >= 1")
        else:
            if(suf):
                print("It can be scheduled by RM")
                print("HB is ", cal, " <= 2")
            else:
                print("It might be schedulable")
        print()
        

## Example

In [3]:
# example 1
T = np.array([5,8,10])
C = np.array([3,1,1])
set1 = RM(T, C)
set1.testFeasible()

...
It can be scheduled by RM
HB is  1.9800000000000002  <= 2



In [4]:
# example 2
T = np.array([4,6,10])
C = np.array([1,2,3])
set2 = RM(T, C)
set2.testFeasible()

...
It might be schedulable



In [5]:
# more examples
T = np.array([10, 18])
C = np.array([8, 0.9])
set3 = RM(T, C)
flag, _ = set3.testLL()
print(flag) # it did not pass LL test
set3.testFeasible()

T = np.array([5, 10])
C = np.array([3, 4])
set4 = RM(T, C)
flag, _ = set4.testLL()
print(flag) # it did not pass LL test
set4.testFeasible()

False
...
It can be scheduled by RM
HB is  1.8900000000000001  <= 2

False
...
It might be schedulable



## Your test here

# Deadline Monotonic (DM)

- Response Time Analysis (RTA): RTA is an exact test (sufficient and necessary)

- WCRT = the maximum response time among all jobs of the task

In [12]:
class DM:
    def __init__(self, T, C, D):
        assert len(T) == len(C)
        assert len(T) == len(D)
        self.T = T
        self.C = C 
        self.D = D
        self.n = len(C)
        
    def DM_guarantee(self):
        
        for i in range(self.n):
            print("--start to test Task["+str(i+1)+']--')
            r = 0
            temp = C[i]
            Ri = C[i]
            print('R'+str(i)+"_"+str(r), '=' , C[i])
            
            while(1):
                Ii = 0
                for j in range(i):
                    Ii += np.ceil(Ri/T[j])*C[j]
                Ri = C[i] + Ii
                r += 1
                print('R'+str(i)+"_"+str(r), '=' , Ri)
                if(Ri > D[i]):
                    flag = False
                    break
                if(Ri != temp):
                    temp = Ri
                else:
                    flag = True
                    break
                    
            if(flag):
                if(i == self.n-1):
                    print()
                    print("It is shceduable")
            else:
                print()
                print('R'+str(i)+"_"+str(r)+ " = " + str(Ri), ">", "D[i]="+str(D[i]))
                print("It is not shceduable")
            
            print()
            

## Example

In [13]:
T = np.array([5, 10, 25])
C = np.array([2, 4, 1])
D = np.array([5, 10, 25])
set1 = DM(T, C, D)
set1.DM_guarantee()

--start to test Task[1]--
R0_0 = 2
R0_1 = 2

--start to test Task[2]--
R1_0 = 4
R1_1 = 6.0
R1_2 = 8.0
R1_3 = 8.0

--start to test Task[3]--
R2_0 = 1
R2_1 = 7.0
R2_2 = 9.0
R2_3 = 9.0

It is shceduable



## Example(not shceduable)

In [14]:
T = np.array([4, 6, 8])
C = np.array([1, 2, 3])
D = np.array([4, 6, 8])
set2 = DM(T, C, D)
set2.DM_guarantee()

--start to test Task[1]--
R0_0 = 1
R0_1 = 1

--start to test Task[2]--
R1_0 = 2
R1_1 = 3.0
R1_2 = 3.0

--start to test Task[3]--
R2_0 = 3
R2_1 = 6.0
R2_2 = 7.0
R2_3 = 9.0

R2_3 = 9.0 > D[i]=8
It is not shceduable



## Your test here

# Earliest Deadline First (EDF)

- In any interval [$t_1$, $t_2$] the computational demand $g(t_1, t_2)$ of 
the task set must be no greater than the available time:
$$g(t_1,t_2) \leq (t_2-t_1)$$
- When tasks are activated simultaneously, we can rewrite this as:
$$g(0,L) \leq L, \ \forall L>0$$
- We can further limit the checkpoints by using a refined function 
that upper-bounds g
- We only need to check values up to L*
- Check only when we have a task deadline

In [8]:
class EDF:
    def __init__(self, T, C, D):
        assert len(T) == len(C)
        assert len(T) == len(D)
        self.T = T 
        self.C = C 
        self.D = D 
        self.n = len(T) 
    
    def getL(self):
        if(1-np.sum(C/T) == 0):
            L_star = np.inf
        else:
            temp = 0
            for i in range(self.n):
                temp += (T[i]-D[i])*(C[i]/T[i])
            L_star = temp/(1-np.sum(C/T))
        
        H = T[0]
        for i in range(1, self.n):
            H = np.lcm(H, T[i])
            
        return np.min([H, np.max([np.max(D), L_star])])
    
    def EDF_guarantee(self):
        
        L = self.getL()
        checkPoint = []
        for i in range(self.n):
            ddl = D[i]
            r = 0
            while(ddl < L):
                checkPoint.append(ddl)
                r += 1
                ddl = r * T[i] + D[i]
        checkPoint = np.sort(list(set(checkPoint)))
        print("The checking positions:", checkPoint)
        print()
        
        for l in checkPoint:
            g_0l = 0
            for i in range(self.n):
                g_0l += np.floor((l+T[i]-D[i])/T[i])*C[i]
            if(g_0l<=l):
                print("g(0,"+str(l)+')', "demand="+str(g_0l), "Pass")
            else:
                print("g(0,"+str(l)+')', "demand="+str(g_0l), "Fail")
                break
        

## Example

In [9]:
# example 1
T = np.array([5, 7, 10])
C = np.array([2, 2, 2])
D = np.array([2.5, 5, 8])

set1 = EDF(T, C, D)
set1.EDF_guarantee()

The checking positions: [ 2.5  5.   7.5  8.  12.  12.5]

g(0,2.5) demand=2.0 Pass
g(0,5.0) demand=4.0 Pass
g(0,7.5) demand=6.0 Pass
g(0,8.0) demand=8.0 Pass
g(0,12.0) demand=10.0 Pass
g(0,12.5) demand=12.0 Pass


In [10]:
# example 2
T = np.array([6, 8, 9])
C = np.array([2, 2, 3])
D = np.array([4, 5, 7])

set2 = EDF(T, C, D)
set2.EDF_guarantee()

The checking positions: [ 4  5  7 10 13 16 21 22]

g(0,4) demand=2.0 Pass
g(0,5) demand=4.0 Pass
g(0,7) demand=7.0 Pass
g(0,10) demand=9.0 Pass
g(0,13) demand=11.0 Pass
g(0,16) demand=16.0 Pass
g(0,21) demand=18.0 Pass
g(0,22) demand=20.0 Pass


## Your test here

# RM(DM) vs EDF

-  RM is easier to implement as it suffices to have a kernel that can handle fixed priorities.
- EDF presumes more sophisticated priority-handling, but induces fewer preemptions and fewer context switches.
- EDF achieves higher utilization (up to 1). RM achieves smaller  utilization unless special conditions hold.
- EDF is able to handle overloads in a more predictable way.
---
- RM under permanent overload:
    - High priority tasks are executed at the necessary rate;
    - Low priority tasks are blocked.
- EDF under permanent overload:
    - All tasks are executed at a slower rate;
    - No task is blocked.