In [None]:
from dataclasses import dataclass
from typing import List, Optional
import random

# 定義作業（Job）的數據結構
@dataclass
class Job:
    id: int
    type: str  # 'AR' (Advance Reservation), 'IM' (Immediate), 'BE' (Best Effort)
    start_time: int  # 作業開始時間
    finish_time: int  # 作業結束時間
    resource_demand: int  # 資源需求（如核心數）

# 定義虛擬機（VM）的數據結構
@dataclass
class VM:
    id: int
    host_id: int
    available_time: int  # 最早可用時間 (Earliest Available Time, EAT)
    assigned_jobs: List[Job]  # 已分配的作業隊列

# 定義主機（Host）的數據結構
@dataclass
class Host:
    id: int
    state: str  # 'Active', 'Idle', 'Standby'
    vms: List[VM]  # 主機上的虛擬機列表

# 模擬雲數據中心的調度器
class CloudScheduler:
    def __init__(self, hosts: List[Host], job_queue: List[Job]):
        self.hosts = hosts  # 數據中心中的主機列表
        self.job_queue = job_queue  # 待調度的作業隊列
        self.backfill_queue = []  # 用於 Best Effort 作業的後備隊列

    def find_earliest_available_host(self, job: Job) -> Optional[Host]:
        """尋找最早可用的主機（Active 或 Idle 狀態，且沒有與 AR 衝突）"""
        earliest_time = float('inf')
        selected_host = None
        for host in self.hosts:
            if host.state in ['Active', 'Idle']:
                for vm in host.vms:
                    # 檢查該 VM 是否被 AR 作業佔用且與當前作業時間衝突
                    ar_conflict = any(
                        j.type == 'AR' and 
                        not (j.finish_time <= job.start_time or j.start_time >= job.finish_time)
                        for j in vm.assigned_jobs
                    )
                    if not ar_conflict and vm.available_time < earliest_time:
                        earliest_time = vm.available_time
                        selected_host = host
        return selected_host

    def preempt(self, host: Host, job: Job) -> bool:
        """執行搶占邏輯：搶占 BE 作業以分配 AR 或 IM 作業"""
        for vm in host.vms:
            be_jobs = [j for j in vm.assigned_jobs if j.type == 'BE']
            for be_job in be_jobs:
                # 如果 BE 作業的時間與當前作業衝突，則搶占
                if not (be_job.finish_time <= job.start_time or be_job.start_time >= job.finish_time):
                    vm.assigned_jobs.remove(be_job)
                    self.backfill_queue.append(be_job)  # 將被搶占的 BE 作業放入後備隊列
                    vm.assigned_jobs.append(job)
                    vm.available_time = job.finish_time
                    return True
        return False

    def schedule_job(self, job: Job):
        """根據 EAVMAT 算法調度單個作業"""
        if job.type == 'BE':  # Best Effort 作業
            host = self.find_earliest_available_host(job)
            if host:
                for vm in host.vms:
                    if vm.available_time <= job.start_time:
                        vm.assigned_jobs.append(job)
                        vm.available_time = job.finish_time
                        host.state = 'Active'
                        return
            self.backfill_queue.append(job)  # 如果沒有可用資源，放入後備隊列

        elif job.type == 'IM':  # Immediate 作業
            host = self.find_earliest_available_host(job)
            if host:
                for vm in host.vms:
                    if vm.available_time <= job.start_time:
                        vm.assigned_jobs.append(job)
                        vm.available_time = job.finish_time
                        host.state = 'Active'
                        return
                if self.preempt(host, job):  # 嘗試搶占 BE 作業
                    host.state = 'Active'
                    return

        elif job.type == 'AR':  # Advance Reservation 作業
            host = self.find_earliest_available_host(job)
            if host:
                for vm in host.vms:
                    conflict = any(
                        j.finish_time > job.start_time and j.start_time < job.finish_time
                        for j in vm.assigned_jobs
                    )
                    if not conflict and vm.available_time <= job.start_time:
                        vm.assigned_jobs.append(job)
                        vm.available_time = job.finish_time
                        host.state = 'Active'
                        return
                if self.preempt(host, job):  # 嘗試搶占 BE 作業
                    host.state = 'Active'
                    return
            print(f"Job {job.id} (AR) rejected due to resource unavailability.")

    def run(self):
        """運行調度器，處理所有作業"""
        for job in self.job_queue:
            print(f"Scheduling Job {job.id} ({job.type})...")
            self.schedule_job(job)

# 模擬數據與測試
def main():
    # 初始化主機和虛擬機
    hosts = [
        Host(id=1, state='Active', vms=[VM(id=1, host_id=1, available_time=0, assigned_jobs=[])]),
        Host(id=2, state='Idle', vms=[VM(id=2, host_id=2, available_time=0, assigned_jobs=[])]),
        Host(id=3, state='Standby', vms=[VM(id=3, host_id=3, available_time=0, assigned_jobs=[])]),
    ]

    # 模擬作業隊列
    job_queue = [
        Job(id=1, type='AR', start_time=5, finish_time=10, resource_demand=1),
        Job(id=2, type='IM', start_time=2, finish_time=4, resource_demand=1),
        Job(id=3, type='BE', start_time=0, finish_time=3, resource_demand=1),
        Job(id=4, type='AR', start_time=8, finish_time=12, resource_demand=1),
    ]

    # 創建調度器並運行
    scheduler = CloudScheduler(hosts, job_queue)
    scheduler.run()

    # 輸出結果
    for host in hosts:
        for vm in host.vms:
            print(f"Host {host.id} (State: {host.state}), VM {vm.id}: Assigned Jobs: "
                  f"{[(j.id, j.type, j.start_time, j.finish_time) for j in vm.assigned_jobs]}")
    print(f"Backfill Queue: {[(j.id, j.type) for j in scheduler.backfill_queue]}")

if __name__ == "__main__":
    main()

In [2]:
from dataclasses import dataclass
from typing import List, Optional, Dict
from enum import Enum
import random

# 定義主機狀態的枚舉類
class HostState(Enum):
    ACTIVE = "Active"
    IDLE = "Idle"
    STANDBY = "Standby"

# 定義作業類型
class JobType(Enum):
    AR = "AR"  # Advance Reservation
    IM = "IM"  # Immediate
    BE = "BE"  # Best Effort

# 作業數據結構
@dataclass
class Job:
    id: int
    type: JobType
    start_time: int
    finish_time: int
    resource_demand: int

# 虛擬機數據結構
@dataclass
class VM:
    id: int
    host_id: int
    available_time: int = 0
    assigned_jobs: List[Job] = None

    def __post_init__(self):
        self.assigned_jobs = self.assigned_jobs or []

# 主機數據結構
@dataclass
class Host:
    id: int
    state: HostState
    vms: List[VM]

# 雲調度器類
class CloudScheduler:
    def __init__(self, hosts: List[Host], jobs: List[Job]):
        self.hosts: List[Host] = hosts
        self.job_queue: List[Job] = jobs
        self.backfill_queue: List[Job] = []

    def _find_earliest_available_host(self, job: Job) -> Optional[Host]:
        """尋找最早可用的主機，排除與 AR 作業衝突的情況"""
        earliest_host = None
        earliest_time = float('inf')

        for host in self.hosts:
            if host.state in (HostState.ACTIVE, HostState.IDLE):
                for vm in host.vms:
                    if self._is_vm_available(vm, job) and vm.available_time < earliest_time:
                        earliest_time = vm.available_time
                        earliest_host = host
        return earliest_host

    def _is_vm_available(self, vm: VM, job: Job) -> bool:
        """檢查 VM 是否可用，不與現有 AR 作業衝突"""
        return not any(
            j.type == JobType.AR and
            j.start_time < job.finish_time and j.finish_time > job.start_time
            for j in vm.assigned_jobs
        )

    def _preempt(self, host: Host, job: Job) -> bool:
        """搶占 BE 作業以分配 AR 或 IM 作業"""
        for vm in host.vms:
            be_jobs = [j for j in vm.assigned_jobs if j.type == JobType.BE]
            for be_job in be_jobs:
                if be_job.start_time < job.finish_time and be_job.finish_time > job.start_time:
                    vm.assigned_jobs.remove(be_job)
                    self.backfill_queue.append(be_job)
                    vm.assigned_jobs.append(job)
                    vm.available_time = job.finish_time
                    return True
        return False

    def schedule_job(self, job: Job) -> None:
        """調度單個作業"""
        host = self._find_earliest_available_host(job)

        match job.type:
            case JobType.BE:
                if host and any(vm.available_time <= job.start_time for vm in host.vms):
                    vm = next(vm for vm in host.vms if vm.available_time <= job.start_time)
                    vm.assigned_jobs.append(job)
                    vm.available_time = job.finish_time
                    host.state = HostState.ACTIVE
                else:
                    self.backfill_queue.append(job)

            case JobType.IM:
                if host:
                    for vm in host.vms:
                        if vm.available_time <= job.start_time:
                            vm.assigned_jobs.append(job)
                            vm.available_time = job.finish_time
                            host.state = HostState.ACTIVE
                            return
                    if self._preempt(host, job):
                        host.state = HostState.ACTIVE

            case JobType.AR:
                if host:
                    for vm in host.vms:
                        if self._is_vm_available(vm, job) and vm.available_time <= job.start_time:
                            vm.assigned_jobs.append(job)
                            vm.available_time = job.finish_time
                            host.state = HostState.ACTIVE
                            return
                    if self._preempt(host, job):
                        host.state = HostState.ACTIVE
                        return
                print(f"Job {job.id} (AR) rejected due to resource unavailability.")

    def run(self) -> None:
        """運行調度器，處理所有作業"""
        for job in self.job_queue:
            print(f"Scheduling Job {job.id} ({job.type.value})...")
            self.schedule_job(job)

    def get_results(self) -> Dict[str, any]:
        """返回調度結果"""
        return {
            "hosts": [
                {
                    "id": host.id,
                    "state": host.state.value,
                    "vms": [
                        {"id": vm.id, "jobs": [(j.id, j.type.value, j.start_time, j.finish_time) for j in vm.assigned_jobs]}
                        for vm in host.vms
                    ]
                }
                for host in self.hosts
            ],
            "backfill_queue": [(j.id, j.type.value) for j in self.backfill_queue]
        }

# 測試函數
def main():
    # 初始化主機和虛擬機
    hosts = [
        Host(id=1, state=HostState.ACTIVE, vms=[VM(id=1, host_id=1)]),
        Host(id=2, state=HostState.IDLE, vms=[VM(id=2, host_id=2)]),
        Host(id=3, state=HostState.STANDBY, vms=[VM(id=3, host_id=3)]),
    ]

    # 模擬作業隊列
    jobs = [
        Job(id=1, type=JobType.AR, start_time=5, finish_time=10, resource_demand=1),
        Job(id=2, type=JobType.IM, start_time=2, finish_time=4, resource_demand=1),
        Job(id=3, type=JobType.BE, start_time=0, finish_time=3, resource_demand=1),
        Job(id=4, type=JobType.AR, start_time=8, finish_time=12, resource_demand=1),
    ]

    # 創建並運行調度器
    scheduler = CloudScheduler(hosts, jobs)
    scheduler.run()

    # 輸出結果
    results = scheduler.get_results()
    for host in results["hosts"]:
        print(f"Host {host['id']} (State: {host['state']}):")
        for vm in host["vms"]:
            print(f"  VM {vm['id']}: Assigned Jobs: {vm['jobs']}")
    print(f"Backfill Queue: {results['backfill_queue']}")

if __name__ == "__main__":
    main()

Scheduling Job 1 (AR)...
Scheduling Job 2 (IM)...
Scheduling Job 3 (BE)...
Scheduling Job 4 (AR)...
Host 1 (State: Active):
  VM 1: Assigned Jobs: [(1, 'AR', 5, 10)]
Host 2 (State: Active):
  VM 2: Assigned Jobs: [(2, 'IM', 2, 4), (4, 'AR', 8, 12)]
Host 3 (State: Standby):
  VM 3: Assigned Jobs: []
Backfill Queue: [(3, 'BE')]


In [None]:

CPU_request_data = pd.read_csv('./data/final_df_sum_no_missing_hour_filled.csv')