In [1]:
from google.colab import drive
drive.mount('/content/drive')

import os, sys
PROJECT_ROOT = "/content/drive/MyDrive/AI_RL_Task_Scheduling"

DATA_DIR = f"{PROJECT_ROOT}/data"
SRC_DIR = f"{PROJECT_ROOT}/src"
MODELS_DIR = f"{PROJECT_ROOT}/models"
OUTPUTS_DIR = f"{PROJECT_ROOT}/outputs"

sys.path.append(SRC_DIR)


Mounted at /content/drive


In [2]:
import numpy as np
import random
from dataclasses import dataclass
from typing import List, Optional

from config import RR_QUANTUM


In [3]:
@dataclass
class Task:
    task_id: int
    arrival_time: int
    processing_time: int
    deadline: int

    remaining_time: int
    waiting_time: int = 0
    start_time: Optional[int] = None
    completion_time: Optional[int] = None

    def is_complete(self):
        return self.remaining_time <= 0


In [4]:
class Scheduler:
    def select_task(self, ready_queue: List[Task], t: int) -> int:
        """
        Return index of selected task in ready_queue
        """
        raise NotImplementedError


In [5]:
class SchedulerEnv:
    def __init__(self, tasks: List[Task], scheduler: Scheduler, max_time: int = 100000):
        self.tasks = sorted(tasks, key=lambda x: x.arrival_time)
        self.scheduler = scheduler
        self.max_time = max_time

        self.time = 0
        self.ready_queue: List[Task] = []
        self.completed_tasks: List[Task] = []

        self.task_ptr = 0  # index of next arriving task

    def step(self):
        # 1) Add newly arrived tasks
        while self.task_ptr < len(self.tasks) and self.tasks[self.task_ptr].arrival_time <= self.time:
            self.ready_queue.append(self.tasks[self.task_ptr])
            self.task_ptr += 1

        if self.ready_queue:
            # 2) Scheduler selects task
            idx = self.scheduler.select_task(self.ready_queue, self.time)
            task = self.ready_queue[idx]

            if task.start_time is None:
                task.start_time = self.time

            # 3) Execute selected task
            task.remaining_time -= 1

            # 4) Update waiting time for others
            for i, other in enumerate(self.ready_queue):
                if i != idx:
                    other.waiting_time += 1

            # 5) Completion check
            if task.is_complete():
                task.completion_time = self.time + 1
                self.completed_tasks.append(task)
                self.ready_queue.pop(idx)

        self.time += 1

    def run(self):
        while (
            self.time < self.max_time and
            (self.task_ptr < len(self.tasks) or len(self.ready_queue) > 0)
        ):
            self.step()

        return self.completed_tasks


In [6]:
def compute_metrics(tasks: List[Task]):
    n = len(tasks)

    waiting_times = np.array([t.waiting_time for t in tasks])
    avg_waiting_time = waiting_times.mean()

    deadline_misses = sum(
        1 for t in tasks if t.completion_time > t.deadline
    )
    deadline_miss_rate = deadline_misses / n

    total_time = max(t.completion_time for t in tasks)
    throughput = n / total_time

    return {
        "avg_waiting_time": avg_waiting_time,
        "deadline_miss_rate": deadline_miss_rate,
        "throughput": throughput
    }


In [7]:
class RandomScheduler(Scheduler):
    def select_task(self, ready_queue, t):
        return random.randint(0, len(ready_queue) - 1)


In [8]:
def make_dummy_tasks():
    tasks = []
    for i in range(5):
        tasks.append(
            Task(
                task_id=i,
                arrival_time=i,
                processing_time=3,
                remaining_time=3,
                deadline=i + 10
            )
        )
    return tasks

tasks = make_dummy_tasks()
scheduler = RandomScheduler()
env = SchedulerEnv(tasks, scheduler)
completed = env.run()

metrics = compute_metrics(completed)
metrics


{'avg_waiting_time': np.float64(6.0),
 'deadline_miss_rate': 0.4,
 'throughput': 0.3333333333333333}

In [9]:
import pandas as pd

df = pd.DataFrame([compute_metrics(completed)])
df.to_csv(f"{OUTPUTS_DIR}/simulator_sanity_metrics.csv", index=False)

print("Saved simulator sanity metrics to outputs/")


Saved simulator sanity metrics to outputs/
