# **Real-Time Systems Project**

### Amir Mahdi Daraei 99105431

### Amirreza Abootalebi 99105197

# Project Documentation -- PHASE 1

#### Group No. 22 -- Project No. 9

#### Course: Real-Time Systems
#### Teaching Assistant: Mrs. Maleki
#### Title: Scheduling and Mapping Critical Mixed Tasks in Multi-Criticality Systems

---

### Introduction
In a modern car, various units like the engine control unit (high criticality level) and the infotainment system (low criticality level) coexist. These systems must operate simultaneously and in harmony, ensuring the execution of their critical tasks. In this project, a two-level mixed-criticality system with periodic LC and HC tasks is designed. LC tasks have one worst-case execution time, while HC tasks have two worst-case execution times. The system starts in the normal mode and transitions to the overrun mode if any HC task exceeds its small execution time without completion.

### Project Goals
- Implement task mapping algorithms based on resource overload and core utilization.
- Evaluate and compare the proposed mapping algorithms with WFD.
- Present results as schedulability and mapping capability charts.


### **Part 0: Imports**

In [32]:
import random
import heapq
import matplotlib.pyplot as plt

### **Part 1: Algorithm Implementation**


#### Task Class
The Task class represents a task with attributes like LC and HC execution times, deadline, period, criticality, and required resources.


In [33]:
class Task:
    """
    A class representing a task with attributes for LC and HC execution times, deadline, period, criticality, and resources.

    Attributes:
        id (int): Identifier for the task.
        exec_time_lc (float): Execution time for low criticality mode.
        exec_time_hc (list of float): Execution times for high criticality mode (two values).
        deadline (int): Deadline for the task.
        period (int): Period of the task.
        criticality (str): Criticality level ('LC' or 'HC').
        remaining_time (float): Remaining execution time for the task.
        state (str): State of the task ('Normal' or 'Overrun').
        resources (list of int): List of resources the task needs access to.
    """
    def __init__(self, id, exec_time_lc, exec_time_hc, deadline, period, criticality, resources):
        self.id = id
        self.exec_time_lc = exec_time_lc
        self.exec_time_hc = exec_time_hc
        self.deadline = deadline
        self.period = period
        self.criticality = criticality
        self.remaining_time = exec_time_lc if criticality == 'LC' else exec_time_hc[0]
        self.state = 'Normal'
        self.resources = resources

    def __lt__(self, other):
        return self.deadline < other.deadline

#### Core Class
The Core class represents a processing core that maintains tasks and current core utilization.

In [34]:
class Core:
    """
    A class representing a processing core with attributes for utilization and tasks.

    Attributes:
        id (int): Identifier for the core.
        utilization (float): Current utilization of the core.
        tasks (list of Task): List of tasks assigned to the core.
    """
    def __init__(self, id):
        self.id = id
        self.utilization = 0
        self.tasks = []

    def add_task(self, task):
        """
        Adds a task to the core and updates the core's utilization.

        Args:
            task (Task): The task to add to the core.
        """
        self.tasks.append(task)
        self.utilization += task.exec_time_lc / task.period


#### Uunifast Function and Generating Synthetic Tasks
The first function generates a set of utilizations for tasks using the UUnifast algorithm and the second one generates synthetic tasks with specified utilization and different criticality levels.

In [35]:
def uunifast(num_tasks, total_util):
    """
    Generates a list of utilizations for tasks using the UUnifast algorithm.

    Args:
        num_tasks (int): Number of tasks.
        total_util (float): Total utilization to be distributed among the tasks.

    Returns:
        list of float: List of utilizations for the tasks.
    """
    utilizations = []
    sum_util = total_util
    for i in range(1, num_tasks):
        next_util = sum_util * (1 - random.random() ** (1 / (num_tasks - i)))
        utilizations.append(next_util)
        sum_util -= next_util
    utilizations.append(sum_util)
    return utilizations

def generate_tasks(num_tasks, total_util, criticality_levels):
    """
    Generates a list of tasks with given utilization and criticality levels.

    Args:
        num_tasks (int): Number of tasks.
        total_util (float): Total utilization for the tasks.
        criticality_levels (list of str): List of criticality levels to be assigned to tasks.

    Returns:
        list of Task: Generated list of tasks.
    """
    utilizations = uunifast(num_tasks, total_util)
    tasks = []
    for i, u in enumerate(utilizations):
        period = random.randint(10, 100)
        exec_time_lc = u * period
        exec_time_hc = [exec_time_lc, exec_time_lc * random.uniform(1.1, 2)] if 'HC' in criticality_levels else [exec_time_lc, exec_time_lc]
        criticality = random.choice(criticality_levels)
        resources = random.sample(range(1, 7), random.randint(2, 6))
        tasks.append(Task(i+1, exec_time_lc, exec_time_hc, period, period, criticality, resources))
    return tasks

#### Calculating Resource Overload
This function calculates the resource overload for a task on a core.

In [36]:
def calculate_resource_overload(task, core):
    """
    Calculates the resource overload for a given task on a core.

    Args:
        task (Task): The task for which to calculate resource overload.
        core (Core): The core on which the resource overload is calculated.

    Returns:
        float: The calculated resource overload.
    """
    overload = 0
    for resource in task.resources:
        resource_usage = sum([t.exec_time_lc if t.criticality == 'LC' else t.exec_time_hc[0] for t in core.tasks if resource in t.resources])
        overload += resource_usage
    return overload

#### Assigning Tasks to Cores
This function assigns tasks to cores based on resource overload and utilization.


In [37]:
def heuristic_mapping(tasks, cores):
    """
    Assigns tasks to cores based on resource overload and utilization.

    Args:
        tasks (list of Task): List of tasks to be assigned.
        cores (list of Core): List of cores to which tasks are assigned.

    Returns:
        float: The ratio of successfully assigned tasks to the total number of tasks.
    """
    assigned_tasks = 0
    for task in tasks:
        cores.sort(key=lambda core: (-calculate_resource_overload(task, core), core.utilization))
        for core in cores:
            remaining_utilization = 1 - core.utilization
            task_utilization = task.exec_time_lc / task.period
            if remaining_utilization >= task_utilization:
                core.add_task(task)
                assigned_tasks += 1
                break
    return assigned_tasks / len(tasks)

####WFD Mapping
Assigns tasks to cores using the Worst-Fit Decreasing (WFD) algorithm.

In [38]:
def wfd_mapping(tasks, cores):
    """
    Assigns tasks to cores using the Worst-Fit Decreasing (WFD) algorithm.

    Args:
        tasks (list of Task): List of tasks to be assigned.
        cores (list of Core): List of cores to which tasks are assigned.

    Returns:
        float: The ratio of successfully assigned tasks to the total number of tasks.
    """
    assigned_tasks = 0
    for task in tasks:
        cores.sort(key=lambda core: core.utilization)  # Sort cores by current utilization in ascending order
        for core in cores:
            remaining_utilization = 1 - core.utilization
            task_utilization = task.exec_time_lc / task.period
            if remaining_utilization >= task_utilization:
                core.add_task(task)
                assigned_tasks += 1
                break
    return assigned_tasks / len(tasks)

### **Part 2: Evaluating and Comparing Mapping Capability**

####Mapping Capability Evaluation Function
This function evaluates the mapping capability of the heuristic and WFD algorithms.


In [39]:
def evaluate_mapping_capability(num_tasks, total_utils, criticality_levels, num_cores, num_runs):
    """
    Evaluates the mapping capability of the heuristic and WFD algorithms.

    Args:
        num_tasks (int): Number of tasks.
        total_utils (list of float): List of total utilizations to be tested.
        criticality_levels (list of str): List of criticality levels.
        num_cores (int): Number of cores.
        num_runs (int): Number of runs for each configuration.

    Returns:
        dict: Results of mapping capabilities for each utilization level.
    """
    results = {util: [] for util in total_utils}

    for util in total_utils:
        for _ in range(num_runs):
            tasks = generate_tasks(num_tasks, util, criticality_levels)

            # Heuristic mapping algorithm
            heuristic_cores = [Core(i+1) for i in range(num_cores)]
            heuristic_mapping_rate = heuristic_mapping(tasks, heuristic_cores)

            # WFD algorithm
            wfd_cores = [Core(i+1) for i in range(num_cores)]
            wfd_mapping_rate = wfd_mapping(tasks, wfd_cores)

            results[util].append((heuristic_mapping_rate, wfd_mapping_rate))
    return results

### **Part 3: Running Evaluations and Plotting Results**

###Evaluation Settings
The evaluation settings include the number of tasks, utilizations, criticality levels, number of cores, and the number of runs.

In [40]:
num_tasks = 400
total_utils = [0.25, 0.5]
criticality_levels = ['LC', 'HC']
num_cores_list = [2, 4, 8]
num_runs = 10
hyper_period = 200

Evaluations for mapping capability is performed for each number of cores and different utilizations.

In [None]:
# Evaluate mapping capability
mapping_results = {}

for num_cores in num_cores_list:
    mapping_results[num_cores] = evaluate_mapping_capability(num_tasks, total_utils, criticality_levels, num_cores, num_runs)

# Plot mapping capability results
for num_cores, results in mapping_results.items():
    plt.figure()
    for util, data in results.items():
        heuristic_mapping_rates = [d[0] for d in data]
        wfd_mapping_rates = [d[1] for d in data]
        plt.plot(heuristic_mapping_rates, label=f'Heuristic (Util={util})')
        plt.plot(wfd_mapping_rates, label=f'WFD (Util={util})')
    plt.xlabel('Run')
    plt.ylabel('Mapping Capability')
    plt.title(f'Mapping Capability Comparison with {num_cores} Cores')
    plt.legend()
    plt.show()

### **Part 4: Conclusion**

**In this project, task mapping algorithms in a two-level mixed-criticality system were implemented and evaluated. The evaluation results were presented as mapping capability charts, showing the performance of the proposed algorithm compared to the WFD algorithm.**