<a href="https://colab.research.google.com/github/Mohammadhosseinkarimi/DQN_V1/blob/main/DQN_V1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# first i create my environment
# i use petting zoo to create environment for task scheduler
# so first install petting zoo
!pip install pettingzoo



In [None]:
from pettingzoo.utils.env import AECEnv #to have Agent Environment cycle
from gymnasium import spaces
import numpy as np
import functools

import gymnasium
from gymnasium.spaces import Discrete
from gymnasium.utils import seeding

from pettingzoo import ParallelEnv
from pettingzoo.utils import parallel_to_aec, wrappers


num_machines = 2    # number of agent
num_resources = 3   # e.g CPU, RAM , GPU
max_queue = 10      # max tasks in queue
max_resource = 10   # max number of resource that agent have
max_priority = 5    # priority for tasks between 1 to 5
max_deadline = 20   # max deadline for every task
total_CPU = 5       # number of CPUs
total_RAM = 5       # number of RAMS
total_GPU = 5       # number of GPU



In [2]:
class TaskSchedulingEnv(AECEnv):
    metadata = {"render_modes": ["human"], "name": "task_scheduler_v1"}

    def __init__(self):
        super().__init__()

        self.current_time = None
        self.resources = None
        self.running_tasks = None
        self.future_tasks = None
        self.task_queue = None
        self.rewards = None
        self.terminations = None
        self.truncations = None
        self.infos = None
        self._agent_selector = None
        self.agent_selection = None

        self.agents = [f"agent_{i}" for i in range(num_machines)]
        self.possible_agents = self.agents[:]

        # this create the base of observasion for any agent
        # we want a matrix that show
        RESOURCE_POOL = ['cpu_1', 'cpu_2', 'cpu_3', 'ram_1', 'ram_2', 'ram_3', 'ram_4', 'ram_5']
        NUM_TOTAL_RESOURCES = len(RESOURCE_POOL)


        obs_space_resources = spaces.MultiBinary(NUM_TOTAL_RESOURCES)


        # ب) فضای مربوط به تسک‌ها (ماتریس MAX_TASKS × NUM_TASK_FEATS)
        #   هر ویژگی: req_CPU, req_RAM, duration, deadline_rem, priority → 5 ویژگی
        NUM_TASK_FEATS = NUM_RESOURCES + 3  # req هر منبع + duration + deadline_rem + priority
        obs_space_tasks = spaces.Box(
            low=0,
            high=MAX_TASK_FEATURE_VALUE,
            shape=(MAX_TASKS, NUM_TASK_FEATS),
            dtype=np.int32
        )

        # ج) فضای نهاییِ مشاهده به صورت Dict
        obs_space_dict = spaces.Dict({
            "resources": obs_space_resources,  # یک بردار 2تایی (CPU, RAM)
            "tasks": obs_space_tasks           # ماتریس (MAX_TASKS × 5)
        })

        # حالا برای هر عامل همین فضای مشاهده را اختصاص می‌دهیم
        self.observation_spaces = {
            agent: obs_space_dict
            for agent in self.agents
        }

        # ---------------- تعریف فضای عمل ----------------
        # هر عامل می‌تواند {0, 1, ..., MAX_TASKS} را انتخاب کند:
        #    0 = skip، 1..MAX_TASKS = تخصیص تسک از صف
        self.action_spaces = {
            agent: spaces.Discrete(MAX_TASKS + 1)
            for agent in self.agents
        }

        # Action space: select task index or 0 for "no action"
        self.action_spaces = {
            agent: spaces.Discrete(MAX_TASKS + 1)
            for agent in self.agents
        }

        self.reset()


    def reset(self, seed=None, options=None):
        self.agent_idx = 0
        self.agents = self.possible_agents[:]
        self.resources = np.full((NUM_MACHINES, NUM_RESOURCES), MAX_RESOURCE)
        self.tasks = self._generate_tasks()
        self.task_queue = self.tasks.copy()
        self._agent_selector = iter(self.agents)
        self.agent_selection = next(self._agent_selector)

        self.rewards = {agent: 0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}

    def observe(self, agent):
        i = int(agent.split('_')[1])
        flat_resources = self.resources[i].tolist()
        flat_tasks = []
        for task in self.task_queue:
            flat_tasks += task['resources'] + [task['duration']]
        while len(flat_tasks) < MAX_TASKS * (NUM_RESOURCES + 1):
            flat_tasks += [0] * (NUM_RESOURCES + 1)
        return np.array(flat_resources + flat_tasks, dtype=np.int32)

    def _generate_tasks(self):
        return [
            {
                "resources": list(np.random.randint(1, MAX_RESOURCE // 2, size=NUM_RESOURCES)),
                "duration": np.random.randint(1, 5)
            }
            for _ in range(MAX_TASKS)
        ]

    def step(self, action):
        agent = self.agent_selection
        i = int(agent.split('_')[1])

        reward = 0
        if action > 0 and action <= len(self.task_queue):
            task = self.task_queue[action - 1]
            if all(task["resources"][j] <= self.resources[i][j] for j in range(NUM_RESOURCES)):
                self.resources[i] -= task["resources"]
                self.task_queue.pop(action - 1)
                reward = 1  # reward for successful scheduling

        self.rewards[agent] = reward

        try:
            self.agent_selection = next(self._agent_selector)
        except StopIteration:
            self._agent_selector = iter(self.agents)
            self.agent_selection = next(self._agent_selector)

    def render(self):
        for i, res in enumerate(self.resources):
            print(f"Machine {i} resources: {res}")
        print(f"Queue: {self.task_queue}")



NameError: name 'AECEnv' is not defined

In [None]:


def dirichlet_integer_partition(total, num_parts, alpha=2.0):
    """
    تخصیص 'total' واحد از یک منبع به 'num_parts' ماشین با استفاده از توزیع Dirichlet و سپس گرد کردن.
    با alpha بزرگ‌تر (مثلاً 2.0) تقسیم به نحو متوازن‌تر انجام می‌شود.
    خروجی: لیستی از اعداد صحیح به طول num_parts که جمعشان برابر total است.
    """
    alpha_vector = np.ones(num_parts) * alpha
    frac = np.random.dirichlet(alpha_vector)
    real_vals = frac * total
    floored = np.floor(real_vals).astype(int)
    remainder = int(total - floored.sum())

    if remainder > 0:
        fractions = real_vals - floored
        idx_desc = np.argsort(-fractions)
        for i in range(remainder):
            floored[idx_desc[i]] += 1

    return floored.tolist()

# ====================== تعریف کلاس محیط ======================

class TaskSchedulingEnv(AECEnv):
    metadata = {
        "render_modes": ["human"],
        "name": "task_scheduler_v3"
    }

    def __init__(self):
        super().__init__()

        # ۱. تعریف لیست Agentها (هر ماشین یک Agent)
        self.agents = [f"agent_{i}" for i in range(NUM_MACHINES)]
        self.possible_agents = self.agents[:]

        # ۲. تعریف فضای مشاهده (Observation Space) برای هر Agent
        #    هر Observation شامل:
        #      - remaining_resources_of_machine_i (NUM_RESOURCES)
        #      - current_time (1)
        #      - وضعیت صف انتظار: برای MAX_TASKS تسک، هرکدام:
        #          [req_CPU, req_RAM, duration, deadline_remaining, priority]  => (NUM_RESOURCES + 3)
        #    => مجموع طول بردار: NUM_RESOURCES + 1 + MAX_TASKS * (NUM_RESOURCES + 3)
        obs_dim = NUM_RESOURCES + 1 + MAX_TASKS * (NUM_RESOURCES + 3)
        high_val = max(TOTAL_CPU, TOTAL_RAM, MAX_DEADLINE, MAX_PRIORITY)
        self.observation_spaces = {
            agent: spaces.Box(
                low=0,
                high=high_val,
                shape=(obs_dim,),
                dtype=np.int32
            )
            for agent in self.agents
        }

        # ۳. تعریف فضای عمل (Action Space) برای هر Agent
        #    هر عدد از 0 تا MAX_TASKS معانی زیر را دارد:
        #      0 = skip (عدم اقدام)
        #      k (1 <= k <= len(task_queue)) = انتخاب تسک kام از صف
        self.action_spaces = {
            agent: spaces.Discrete(MAX_TASKS + 1)
            for agent in self.agents
        }

        # ۴. متغیرهای حالت داخلی که در reset مقداردهی می‌شوند
        self.current_time = None     # شمارندهٔ زمان شبیه‌سازی
        self.resources = None        # np.ndarray شکل (NUM_MACHINES, NUM_RESOURCES)
        self.running_tasks = None    # لیست دیکشنری‌های {machine, finish_time, task}
        self.future_tasks = None     # لیست تمام taskهایی که هنوز ARRIVAL نشده‌اند
        self.task_queue = None       # لیست taskهایی که وارد صف شده‌اند
        self.rewards = None          # دیکشنری agent-> float
        self.terminations = None     # دیکشنری agent-> bool
        self.truncations = None      # دیکشنری agent-> bool (در این مثال استفاده نمی‌شود)
        self.infos = None            # دیکشنری agent-> dict

        self._agent_selector = None  # iterator برای ترتیب Agentها
        self.agent_selection = None  # نام Agentی که در نوبت است

    # ۴. متد کمکی برای تولید همهٔ تسک‌های اولیه با ویژگی‌های کامل
    def _generate_all_tasks(self):
        """
        تولید 2*MAX_TASKS تسک تصادفی:
          - arrival_time: عددی در [0, MAX_DEADLINE//2]
          - resources: [cpu_req, ram_req] هرکدام در [1, max(TOTAL_CPU, TOTAL_RAM)//2]
          - duration: عددی در [1, 4]
          - deadline: عددی >= arrival + duration تا حداکثر MAX_DEADLINE
          - priority: عددی در [1, MAX_PRIORITY]
        خروجی: لیستی از دیکشنری‌های task، مرتب‌شده بر اساس arrival_time
        """
        all_tasks = []
        num_total = 2 * MAX_TASKS
        for _ in range(num_total):
            arrival = np.random.randint(0, MAX_DEADLINE // 2)
            resource_req = np.random.randint(1, max(TOTAL_CPU, TOTAL_RAM) // 2, size=NUM_RESOURCES).tolist()
            duration = np.random.randint(1, 5)
            min_dead = arrival + duration
            max_dead = min_dead + (MAX_DEADLINE - min_dead)
            deadline = np.random.randint(min_dead, max_dead + 1) if max_dead > min_dead else min_dead
            priority = np.random.randint(1, MAX_PRIORITY + 1)

            task = {
                "resources": resource_req,
                "arrival_time": arrival,
                "duration": duration,
                "deadline": deadline,
                "priority": priority
            }
            all_tasks.append(task)

        # مرتب‌سازی بر اساس arrival_time
        all_tasks.sort(key=lambda t: t["arrival_time"])
        return all_tasks

    # ۵. متد reset برای شروع اپیزود جدید
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        # ۵.۱. صفر کردن زمان شبیه‌سازی
        self.current_time = 0

        # ۵.۲. تخصیص خودکار منابع (CPU و RAM) به NUM_MACHINES ماشین
        #      از روش Dirichlet-based با alpha=2.0 استفاده می‌کنیم تا تخصیص نسبتاً مساوی باشد.
        cpu_partition = dirichlet_integer_partition(TOTAL_CPU, NUM_MACHINES, alpha=2.0)
        ram_partition = dirichlet_integer_partition(TOTAL_RAM, NUM_MACHINES, alpha=2.0)

        self.resources = np.zeros((NUM_MACHINES, NUM_RESOURCES), dtype=np.int32)
        for i in range(NUM_MACHINES):
            self.resources[i, 0] = cpu_partition[i]
            self.resources[i, 1] = ram_partition[i]

        # ۵.۳. تولید تسک‌های آینده و خالی کردن صف
        self.future_tasks = self._generate_all_tasks()
        self.task_queue = []

        # ۵.۴. هیچ تسکی در حال اجرا نیست
        self.running_tasks = []

        # ۵.۵. مقداردهی اولیهٔ پاداش و پایانی‌ها و infos
        self.rewards = {agent: 0.0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}  # در این مثال استفاده‌ای ندارد
        self.infos = {agent: {} for agent in self.agents}

        # ۵.۶. آماده‌سازی Iterator برای ترتیب Agentها
        self._agent_selector = iter(self.agents)
        self.agent_selection = next(self._agent_selector)

    # ۶. متد کمکی برای اضافه‌کردن تسک‌های تازه به صف (بر اساس current_time)
    def _add_arrived_tasks_to_queue(self):
        """
        هر تسکی که arrival_time <= current_time باشد وارد صف می‌شود،
        تا زمانی که ظرفیت صف (MAX_TASKS) پر نشده باشد.
        """
        while self.future_tasks and self.future_tasks[0]["arrival_time"] <= self.current_time:
            task = self.future_tasks.pop(0)
            if len(self.task_queue) < MAX_TASKS:
                self.task_queue.append(task)
            else:
                # اگر صف پر باشد، فعلاً آن تسک را نگه می‌داریم (و منتظر زمان بعدی می‌ماند)
                # برای سادگی در این مدل، اگر صف پر باشد، از future_tasks حذف نمی‌کنیم.
                break

    # ۷. متد کمکی برای آزادسازی منابع تسک‌های تمام‌شده
    def _release_completed_tasks(self):
        """
        هر Entry در running_tasks که finish_time <= current_time باشد،
        منابعش را به ماشین مربوطه باز می‌گرداند و از لیست running_tasks حذف می‌کند.
        """
        still_running = []
        for entry in self.running_tasks:
            if entry["finish_time"] <= self.current_time:
                m_idx = entry["machine"]
                t = entry["task"]
                # آزادسازی منابع
                self.resources[m_idx] += np.array(t["resources"], dtype=np.int32)
            else:
                still_running.append(entry)
        self.running_tasks = still_running

    # ۸. متد observe: تولید بردار مشاهده برای یک Agent مشخص
    def observe(self, agent):
        """
        خروجی: برداری از طول ثابت = NUM_RESOURCES + 1 + MAX_TASKS * (NUM_RESOURCES + 3)
        ساختار:
          [ rem_cpu_of_machine_i, rem_ram_of_machine_i,
            current_time,
            task_1_req_cpu, task_1_req_ram, task_1_duration, task_1_deadline_rem, task_1_priority,
            task_2_..., ...,
            تا MAX_TASKS تسک؛ اگر صف کمتر باشد، با صفر پد می‌شود.
          ]
        """
        i = int(agent.split('_')[1])  # شماره ماشین مربوط به این Agent

        # ۸.۱. منابع باقی‌ماندهٔ ماشین i
        flat_resources = self.resources[i].tolist()

        # ۸.۲. زمان فعلی
        tnow = [self.current_time]

        # ۸.۳. وضعیت صف انتظار
        flat_tasks = []
        for task in self.task_queue:
            flat_tasks.extend(task["resources"])                 # نیازهای منابع [cpu_req, ram_req]
            flat_tasks.append(task["duration"])                  # مدت اجرا
            rem_dead = max(0, task["deadline"] - self.current_time)
            flat_tasks.append(rem_dead)                           # زمان باقیمانده تا ددلاین
            flat_tasks.append(task["priority"])                   # اولویت

        # اگر تعداد تسک‌های صف < MAX_TASKS، با صفر پر می‌کنیم
        per_task_dim = NUM_RESOURCES + 3
        needed_padding = MAX_TASKS * per_task_dim - len(flat_tasks)
        if needed_padding > 0:
            flat_tasks.extend([0] * needed_padding)

        obs = np.array(flat_resources + tnow + flat_tasks, dtype=np.int32)
        return obs

    # ۹. متد step: دریافت یک action از Agent و به‌روزرسانی محیط
    def step(self, action):
        """
        دریافت action از Agent self.agent_selection:
          - action=0: skip (عدم اقدام)
          - action=k (1 <= k <= len(task_queue)): انتخاب تسک kام از صف برای اختصاص منابع
        اگر منابع کامل در ماشین i آزاد باشد:
          - منابع را یکجا کم می‌کنیم (All-or-Nothing Allocation)
          - ثبت finish_time = current_time + duration
          - محاسبه پاداش بر مبنای deadline و priority
          - حذف تسک از صف و اضافه کردن به running_tasks
        اگر منابع کافی نباشد:
          - پاداش منفی کوچک
          - تسک در صف باقی می‌ماند
        سپس:
          - انتخاب Agent بعدی (نوبتی)
          - اگر یک «دور کامل» عامل‌ها تمام شده باشد:
               * current_time += 1
               * آزادسازی منابع تسک‌هایی که تمام شدند
               * اضافه کردن تسک‌های جدید به صف بر اساس arrival_time
          - بررسی شرایط پایان اپیزود:
            اگر task_queue، running_tasks و future_tasks همگی خالی باشند، اپیزود تمام است.
        """

        agent = self.agent_selection
        i = int(agent.split('_')[1])  # شماره ماشین که این Agent مأمور تصمیم‌گیری است

        reward = 0.0

        # ==== ۹.۱. برسی و اعمال action ====
        if 1 <= action <= len(self.task_queue):
            task = self.task_queue[action - 1]

            # ۹.۱.۱. چک All-or-Nothing: آیا هر دو منابع (CPU و RAM) آزاد است؟
            if all(task["resources"][r] <= self.resources[i][r] for r in range(NUM_RESOURCES)):
                # منابع کافی است → All-or-Nothing Allocation
                self.resources[i] -= np.array(task["resources"], dtype=np.int32)

                # ثبت زمان پایان تسک
                finish_t = self.current_time + task["duration"]
                self.running_tasks.append({
                    "machine": i,
                    "finish_time": finish_t,
                    "task": task
                })

                # محاسبه پاداش:
                # اگر تسک پیش از ددلاین تمام شود، پاداش = priority؛ در غیر این صورت = -priority
                if finish_t <= task["deadline"]:
                    reward = float(task["priority"])
                else:
                    reward = float(-task["priority"])

                # حذف تسک از صف
                self.task_queue.pop(action - 1)
            else:
                # منابع کافی نیست → پاداش منفی کوچک و تسک را در صف نگه می‌دارد
                reward = -0.1
        else:
            # action == 0 یا خارج از محدوده → skip، پاداش صفر
            reward = 0.0

        # ثبت پاداش برای این Agent
        self.rewards[agent] = reward

        # ==== ۹.۲. انتخاب Agent بعدی ====
        try:
            next_agent = next(self._agent_selector)
            self.agent_selection = next_agent
            end_of_cycle = False
        except StopIteration:
            # یک دور کامل عامل‌ها تمام شد → دور جدید
            self._agent_selector = iter(self.agents)
            self.agent_selection = next(self._agent_selector)
            end_of_cycle = True

        # ==== ۹.۳. اگر یک دور کامل تمام شده باشد (end_of_cycle=True) ====
        if end_of_cycle:
            # افزایش زمان شبیه‌سازی
            self.current_time += 1

            # آزادسازی منابع تسک‌های تمام‌شده
            self._release_completed_tasks()

            # اضافه کردن تسک‌هایی که زمان ورودشان رسیده
            self._add_arrived_tasks_to_queue()

        # ==== ۹.۴. بررسی پایان اپیزود ====
        if not self.task_queue and not self.running_tasks and not self.future_tasks:
            for a in self.agents:
                self.terminations[a] = True

    # ۱۰. متد render: نمایش متنی وضعیت محیط
    def render(self):
        """
        چاپ وضعیت:
          - Current Time
          - منابع باقی‌ماندهٔ هر ماشین
          - صف انتظار: هر تسک با [req, duration, deadline_rem, priority]
          - تسک‌های در حال اجرا: هر Entry با [machine, finish_time, priority]
          - تعداد تسک‌های آینده (future_tasks)
        """
        print("========== Render Environment ==========")
        print(f"Current Time: {self.current_time}\n")
        for i in range(NUM_MACHINES):
            print(f"Machine {i} resources: CPU={self.resources[i,0]}, RAM={self.resources[i,1]}")

        print("\nQueue:")
        for idx, task in enumerate(self.task_queue):
            rem_dead = max(0, task["deadline"] - self.current_time)
            print(f"  Task {idx+1}: req={task['resources']}, dur={task['duration']}, "
                  f"deadline_rem={rem_dead}, prio={task['priority']}")

        print("\nRunning Tasks:")
        for entry in self.running_tasks:
            print(f"  On Machine {entry['machine']}: finish={entry['finish_time']}, "
                  f"prio={entry['task']['priority']}")

        print(f"\nFuture tasks remaining: {len(self.future_tasks)}")
        print("========================================\n")