In [None]:
from simsystem.objects import Job, Task, Component, Core
import logging
import math

logger = logging.getLogger(__name__)

#region DBF
def dbf_edf(W: list[Task], t: int, speed_factor) -> int:
    """
    Demand Bound Function (EDF), the implementation is for an explicit deadline, 
    note that the implicit deadline is a special case of explicit deadline, 
    therefore the algorithm should be compatible for both.

    Arguments:
        W: list of tasks
        t: time interval
    """

    # Assert that the time interval is valid
    if t <= 0:
        return 0

    # Calculate the demand bound function for each task
    dbf = 0
    for task in W:
        # Extract task parameters
        C_i = task.wcet / speed_factor
        T_i = task.period
        D_i = task.deadline

        # Find tasks with deadlines within time interval
        if task.deadline <= t:
            dbf += ((t + T_i - D_i) // T_i) * C_i #Equation 3.3 in the handbook
    return dbf

def dbf_rm(W: list[Task], t: int, idx : int, speed_factor) -> int:
    """
    Demand Bound Function (RM)

    Arguments:
        W: list of tasks
        t: time interval
        idx: index of the task in the list W
    """

    # Assert that the time interval is valid
    if t <= 0:
        return 0

    # Calculate the demand bound function for each task
    prio_threshold = W[idx].priority
    HP_tasks = [task for task in W if task.priority < prio_threshold]# Gather Strictly High Priority tasks
    
    dbf = W[idx].wcet / speed_factor # The task itself is always in the interval
    for task in HP_tasks:
        # Extract task parameters
        C_k = task.wcet / speed_factor
        T_k = task.period

        # Find tasks with deadlines within time interval
        if task.deadline <= t:
            dbf += math.ceil(t / T_k) * C_k #Equation 3.3 in the handbook
    return dbf
#endregion

def sbf_bdr(R, t):
    """
    Supply Bound Function (SBF) for Bounded Delay Resource (BDR)

    Arguments:
        R: list of tasks
        t: time interval
    """
    # Assert that the time interval is valid
    if t <= 0:
        return 0
    
    alpha, delta = R
    
    if t >= delta:
        sbf = alpha * (t - delta)
    else:
        sbf = 0
    
    return sbf

def bdr_core(core: Core) -> tuple[float, int]:
    """
    Computes the Bounded Delay Resource (BDR) for a given core.
    The BDR is defined as the ratio of the budget to the speed factor.
    """
    
    # Assert that the core is valid
    if not isinstance(core, Core):
        raise ValueError("Invalid core object")
    
    R = (1, -1e-9) # Default values for alpha and delta
    return R

def bdr_interface(component : Component) -> tuple[float, int]:
    """
    Computes the Bounded Delay Resource (BDR) interface for a given component and core.
    Note that the Component and Core should share the same core_id.
    """
    
    Q = component.budget
    P = component.period
    alpha = Q / P
    delta = 2 * (P - Q)
    R = (alpha, delta)
    return R

def required_bdr(components : list[Component], core: Core) -> tuple[float, int]:
    """
    Computes the required Bounded Delay Resource (BDR) for a list of components and a core.
    Note that the Component and Core should share the same core_id.
    """
    
    # Assert that all components share the same core_id
    if not all(component.core_id == core.core_id for component in components):
        raise ValueError("All components must share the same core_id as the core")
    
    R_i = [bdr_interface(component) for component in components]
    alpha = sum(r[0] for r in R_i)
    delta = min(r[1] for r in R_i)
    
    R = (alpha, delta)
    
    return R

def bdr_schedulability(components: list[Component], core : Core) -> bool:
    """
    Checks if the Bounded Delay Resource (BDR) is schedulable for a list of components and a core.
    Note that the Component and Core should share the same core_id.
    """
    
    print(f"Checking BDR schedulability for core {core.core_id} and components {components}")
    
    # Assert that all components share the same core_id
    if not all(component.core_id == core.core_id for component in components):
        raise ValueError("All components must share the same core_id as the cores")
    
    # Calculate the required BDR
    required_R = required_bdr(components, core)
    
    print(f"Required BDR: {required_R}")
    
    # Calculate the available BDR
    available_R = bdr_core(core)
    
    print(f"Available BDR: {available_R}")
    
    alpha_required, delta_required = required_R
    alpha_available, delta_available = available_R
    
    # Check if the required BDR is less than or equal to the available BDR according to Theorem 3.1
    return alpha_required <= alpha_available and delta_required > delta_available


def schedulability_test(tasks: dict[str, Task], components: dict[str, Component], cores: dict[str, Core]) -> bool:
    """
    Checks if the system is schedulable based on the Bounded Delay Resource (BDR) model.
    
    Arguments:
        tasks: list of tasks
        components: list of components
        cores: list of cores
    """
    
    
    # # Check if the system is schedulable based on BDR
    # if not bdr_schedulability_all(list(components.values()), list(cores.values())):
    #     return False
    
    for core in cores.values():
        print(core)
    


    


In [209]:
import os
import json
import logging.config
import hydra
from omegaconf import DictConfig
from simsystem.objects import Job, Task, Component, Core, HierarchicalSystem

import simsystem.analytical_tools as at
from simsystem.input_model import Input_Model
from simsystem.config_structure import ExperimentConfig

logger = logging.getLogger(__name__)

architecture_path = os.path.join("../test-cases/1-tiny-test-case", "architecture.csv")
budgets_path = os.path.join("../test-cases/1-tiny-test-case", "budgets.csv")
tasks_path = os.path.join("../test-cases/1-tiny-test-case", "tasks.csv")



In [None]:
cores = Input_Model.read_architecture(architecture_path)
components = Input_Model.read_budgets(budgets_path)
tasks = Input_Model.read_tasks(tasks_path)

print(f"Loaded architecture: {cores}")
print(f"Loaded budgets: {components}")
print(f"Loaded tasks: {tasks}")


        

Loaded architecture: {'Core_1': Core(core_id='Core_1', speed_factor=0.62, scheduler='RM', components=[])}
Loaded budgets: {'Camera_Sensor': Component(component_id='Camera_Sensor', scheduler='RM', budget=50, period=50, core_id='Core_1', tasks=[], bdr_interface=None, priority=0)}
Loaded tasks: {'Task_0': Task(name='Task_0', wcet=14, period=50, component_id='Camera_Sensor', scheduler=None, priority=0, deadline=50), 'Task_1': Task(name='Task_1', wcet=33, period=100, component_id='Camera_Sensor', scheduler=None, priority=1, deadline=100)}
Evaluating Core_1
Checking BDR schedulability for core Core_1 and components [Component(component_id='Camera_Sensor', scheduler='RM', budget=50, period=50, core_id='Core_1', tasks=[Task(name='Task_0', wcet=14, period=50, component_id='Camera_Sensor', scheduler=None, priority=0, deadline=50), Task(name='Task_1', wcet=33, period=100, component_id='Camera_Sensor', scheduler=None, priority=1, deadline=100)], bdr_interface=None, priority=0)]
Required BDR: (1.0,