In [None]:
# Cell 2: Imports and device setup [Runtime: ~5s]
import math
import numpy as np
import pandas as pd
import random
from copy import deepcopy
from typing import Dict, Any, Optional, List
import csv
import time
import warnings

# Suppress deprecation warnings from jupyter_client
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', message='.*utcnow.*')

import torch
import matplotlib.pyplot as plt

import gymnasium as gym
from gymnasium import spaces

from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import BaseCallback, CallbackList
from stable_baselines3.common.utils import set_random_seed
from sb3_contrib.ppo_mask import MaskablePPO
from sb3_contrib.common.wrappers import ActionMasker
from sb3_contrib.common.maskable.utils import get_action_masks

try:
    torch.set_float32_matmul_precision('high')
except Exception:
    pass

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
SEED = 42  # Global seed for reproducibility
print(f'Using device: {DEVICE}')
if DEVICE == 'cuda':
    print(f'GPU: {torch.cuda.get_device_name(0)}')


In [None]:
# Cell 5: Improved Scheduling Environment [Runtime: instant]
class SchedulingEnvImproved(gym.Env):
    metadata = {"render.modes": []}
    def __init__(self, gpu_config, passing_in_queue, queue):
        super().__init__()
        self.gpu_config, self.queue, self.passing_in_queue = gpu_config, queue, passing_in_queue
        self.gpu_energy_dict, self.total_tardiness, self.num_late_jobs, self.total_energy_consumption = {}, 0.0, 0, 0.0
        if self.passing_in_queue and self.queue is not None and 'id' not in self.queue.columns:
            self.queue.insert(0, "id", self.queue.index + 1)
        slices, slice_id = [], 0
        for gpu_id, gpu in enumerate(self.gpu_config):
            self.gpu_energy_dict[gpu_id] = 0.0
            for s in MIG_PROFILE[gpu]:
                slices.append([gpu_id, slice_id, int(s[0]), 0]); slice_id += 1
        self.slices = slices
        self.observation_space = spaces.Dict({
            "next_job": spaces.Box(0, np.inf, shape=(4,), dtype=np.float32),
            "ready_queue_dist": spaces.Box(0, 1, shape=(40,), dtype=np.float32),
            "slices": spaces.Box(low=np.tile(np.array([0,0,1,0],dtype=np.float32),(len(slices),1)),
                high=np.tile(np.array([len(self.gpu_config)-1,len(slices)-1,7,1],dtype=np.float32),(len(slices),1)),
                shape=(len(slices),4), dtype=np.float32),
            "extras": spaces.Box(low=0.0, high=1.0, shape=(2,), dtype=np.float32),
        })
        self.action_space = spaces.Discrete(len(slices))

    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self.gpu_energy_dict, self.total_tardiness, self.num_late_jobs, self.total_energy_consumption = {}, 0.0, 0, 0.0
        slices, slice_id = [], 0
        for gpu_id, gpu in enumerate(self.gpu_config):
            self.gpu_energy_dict[gpu_id] = 0.0
            for s in MIG_PROFILE[gpu]:
                slices.append([gpu_id, slice_id, int(s[0]), 0]); slice_id += 1
        self.slices = slices
        if not self.passing_in_queue: self.queue = create_queue().copy()
        self.working_queue, self.event_list = [], []
        for row in self.queue.itertuples():
            self.event_list.append((row.arrivals, 'arrival', row.id, row.deadlines))
        self.working_queue.append(self.event_list[0][2]); self.now = self.event_list[0][0]; self.event_list.pop(0)
        return self._get_obs(), {}

    def _get_obs(self):
        working_jobs = self.queue[self.queue['id'].isin(self.working_queue)].copy()
        if working_jobs.empty:
            deadline_dist = avg_duration_small_dist = avg_duration_medium_dist = avg_duration_large_dist = [0.0]*10
        else:
            rel_deadline = working_jobs['deadlines'] - self.now
            avg_duration_small = (working_jobs['g1_duration'] + working_jobs['g2_duration']) / 2
            avg_duration_medium = (working_jobs['g3_duration'] + working_jobs['g4_duration']) / 2
            avg_duration_large = working_jobs['g7_duration']
            job_bins = [-100, 0, 0.05, 0.2, 0.5, 1, 5, 10, 20, 30, np.inf]
            def proportion_in_bins(series, bins):
                cut = pd.cut(series, bins=bins, right=False, include_lowest=True)
                return cut.value_counts(normalize=True).reindex(pd.IntervalIndex.from_breaks(bins,closed='left'),fill_value=0.0).to_list()
            deadline_dist = proportion_in_bins(rel_deadline, job_bins)
            avg_duration_small_dist = proportion_in_bins(avg_duration_small, job_bins)
            avg_duration_medium_dist = proportion_in_bins(avg_duration_medium, job_bins)
            avg_duration_large_dist = proportion_in_bins(avg_duration_large, job_bins)
        self.working_queue = sorted(self.working_queue, key=lambda jid: self.queue.loc[self.queue["id"]==jid,"deadlines"].iat[0])
        next_job_chars = np.array([0,0,0,0], dtype=np.float32)
        if len(self.working_queue) > 0:
            jr = self.queue.loc[self.queue["id"]==self.working_queue[0]]
            next_job_chars = np.array([float((jr['deadlines'].iloc[0]-self.now)/TIME_SCALE),
                float(((jr['g1_duration'].iloc[0]+jr['g2_duration'].iloc[0])/2)/TIME_SCALE),
                float(((jr['g3_duration'].iloc[0]+jr['g4_duration'].iloc[0])/2)/TIME_SCALE),
                float(jr['g7_duration'].iloc[0]/TIME_SCALE)], dtype=np.float32)
        free_slices = [s for s in self.slices if s[3]==0]
        return {"next_job": next_job_chars, "ready_queue_dist": np.array(deadline_dist+avg_duration_small_dist+avg_duration_medium_dist+avg_duration_large_dist,dtype=np.float32),
            "slices": np.array(self.slices,dtype=np.float32), "extras": np.array([min(len(self.working_queue)/MAX_QUEUE_SIZE,1.0), len(free_slices)/len(self.slices) if self.slices else 0.0],dtype=np.float32)}

    def calculate_energy(self, gpu_id):
        busy = [s for s in self.slices if s[0]==gpu_id and s[3]==1]
        energy = mig_util_energy.get(sum(s[2] for s in busy)/7, 250) * (self.now - self.gpu_energy_dict[gpu_id])
        self.total_energy_consumption += energy; self.gpu_energy_dict[gpu_id] = self.now
        return energy

    def valid_action_mask(self): return np.array([s[3]==0 for s in self.slices])

    def step(self, action):
        sel = self.slices[action]; self.slices[action][3] = 1; self.calculate_energy(sel[0])
        job_id = self.working_queue.pop(0); jr = self.queue.loc[self.queue["id"]==job_id]
        dur = jr[slice_dur_col_match[sel[2]]].iloc[0]
        self.event_list.append((self.now+dur, 'completion', job_id, sel[0], sel[1], self.now))
        free_slices = [s for s in self.slices if s[3]==0]
        if len(self.working_queue)>0 and len(free_slices)>0:
            return self._get_obs(), 0.0, False, False, {'action_mask': self.valid_action_mask()}
        step_tardiness, step_energy, num_completions = 0.0, 0.0, 0
        while True:
            self.event_list = sorted(self.event_list, key=lambda x: x[0])
            if not self.event_list: break
            event = self.event_list.pop(0); self.now = event[0]
            if event[1]=='arrival': self.working_queue.append(event[2])
            elif event[1]=='completion':
                num_completions += 1  # Track actual completions processed
                deadline = self.queue[self.queue['id']==event[2]]['deadlines'].iloc[0]
                tardiness = max(0.0, self.now-deadline)
                if tardiness>0: self.total_tardiness+=tardiness; self.num_late_jobs+=1; step_tardiness+=tardiness
                self.slices[event[4]][3] = 0
            # Preemption: collect running jobs to reschedule
            preempted_events = [e for e in self.event_list if e[1]=='completion']
            self.event_list = [e for e in self.event_list if e[1]!='completion']
            for e in preempted_events:
                elapsed = self.now - e[5]
                duration = e[0] - e[5]
                # Fix: guard against division by zero
                pct = elapsed / duration if duration > 1e-9 else 1.0
                idx = self.queue.loc[self.queue['id']==e[2]].index[0]
                for col in ['g7_duration','g4_duration','g3_duration','g2_duration','g1_duration']:
                    self.queue.loc[idx,col] *= (1-pct)
                if self.queue[self.queue['id']==e[2]]['deadlines'].iloc[0]<self.now: step_tardiness+=self.now-self.queue[self.queue['id']==e[2]]['deadlines'].iloc[0]
                self.working_queue.append(e[2])
            for gid in range(len(self.gpu_config)): step_energy+=self.calculate_energy(gid)
            for s in self.slices: s[3]=0
            if len(self.working_queue)>0 or len(self.event_list)==0: break
        # Fix: use num_completions counter instead of completion_events list
        reward = (-step_tardiness-0.0000225*step_energy)/(max(1, num_completions)*1.0000225)
        terminated = len(self.event_list)==0 and len(self.working_queue)==0
        if terminated:
            reward = (-self.total_tardiness-0.0000225*self.total_energy_consumption)/(len(self.queue)*0.0000225+1)
            info = {'total_energy': self.total_energy_consumption, 'avg_tardiness': self.total_tardiness/max(1,len(self.queue)), 'num_late_jobs': self.num_late_jobs, 'total_jobs': len(self.queue)}
        else: info = {'total_energy': self.total_energy_consumption}
        info['action_mask'] = self.valid_action_mask()
        return self._get_obs(), reward, terminated, False, info


In [None]:
# Cell 7: Heuristic baselines [Runtime: ~30s for quick test, ~5min for full eval]
SLICE_SIZE_IDX = 2

def heuristic_select_action(obs, mask, heuristic="EFT"):
    """Select action based on heuristic rule.
    
    Heuristics:
    - EFT: Earliest Finish Time - prefer slices that minimize completion time
    - ENERGY_MIN: Prefer smaller slices for lower energy consumption
    - FIFO: First available slice
    - EDD: Earliest Due Date - uses largest slice for urgent jobs (already sorted by deadline)
    """
    slices, next_job = obs["slices"], obs["next_job"]
    dur_small, dur_med, dur_large = next_job[1], next_job[2], next_job[3]
    candidates = []
    for idx, s in enumerate(slices):
        if not mask[idx]: continue
        size = int(s[SLICE_SIZE_IDX])
        est = dur_small if size in (1,2) else (dur_med if size in (3,4) else dur_large)
        if heuristic == "EFT": 
            score = est  # prefer smallest estimated finish time
        elif heuristic == "ENERGY_MIN": 
            score = est * (0.5 + 0.1*size)  # prefer smaller slices
        elif heuristic == "FIFO": 
            score = idx  # first available
        elif heuristic == "EDD": 
            # EDD prefers largest slice (fastest) for urgent jobs
            score = -size  # prefer larger slice for faster completion
        else: 
            score = np.random.rand()
        candidates.append((score, idx))
    if not candidates:
        avail = np.where(mask)[0]
        return int(avail[0]) if len(avail) else 0
    candidates.sort(key=lambda x: x[0])
    return int(candidates[0][1])

def run_heuristic_eval(n_queues=10, heuristic="EFT", seed=42, hour_range=24, verbose=True):
    """Evaluate a heuristic on n_queues random job queues.
    
    Args:
        n_queues: Number of queues to evaluate
        heuristic: One of "EFT", "ENERGY_MIN", "FIFO", "EDD"
        seed: Random seed for reproducibility
        hour_range: Hours of jobs to generate (24 = ~800 jobs, 4 = ~130 jobs)
        verbose: Print progress
    """
    np.random.seed(seed); random.seed(seed)
    total_energy, avg_tardiness, late_fractions = [], [], []
    start = time.time()
    for i in range(n_queues):
        queue = create_queue(hour_range=hour_range).drop('id', axis=1)
        env = ActionMasker(SchedulingEnvImproved(GPU_CONFIG, True, queue.copy()), mask_fn)
        obs, _ = env.reset(); done = False
        step_count = 0
        while not done:
            act = heuristic_select_action(obs, get_action_masks(env), heuristic)
            obs, reward, terminated, truncated, info = env.step(act)
            done = terminated or truncated
            step_count += 1
            if done:
                total_energy.append(info['total_energy']); avg_tardiness.append(info['avg_tardiness'])
                late_fractions.append(info['num_late_jobs']/info['total_jobs'])
        if verbose:
            print(f"  Queue {i+1}/{n_queues}: {info['total_jobs']} jobs, {step_count} steps, tardiness={info['avg_tardiness']:.4f}")
    elapsed = time.time() - start
    if verbose:
        print(f"  Completed in {elapsed:.1f}s")
    return {'method': f'Heuristic-{heuristic}', 'mean_energy': float(np.mean(total_energy)), 'std_energy': float(np.std(total_energy)),
        'mean_tardiness': float(np.mean(avg_tardiness)), 'std_tardiness': float(np.std(avg_tardiness)), 'mean_late_fraction': float(np.mean(late_fractions)), 'n_queues': n_queues}

# Quick sanity test with SHORT queues (4 hours = ~130 jobs instead of 24 hours = ~800 jobs)
print("Quick sanity test with EFT heuristic (4-hour queues)...")
eft_result = run_heuristic_eval(n_queues=1, heuristic="EFT", hour_range=4)
print(f"✓ EFT test passed: tardiness={eft_result['mean_tardiness']:.4f}, energy={eft_result['mean_energy']:.2f}")


# Predicted Improvement Analysis

## Bug Fixes Applied:
1. **Reward calculation** - Now correctly tracks completion count during event loop
2. **EDD heuristic** - Fixed to prefer larger slices for urgent jobs (faster completion)
3. **Division by zero** - Guarded preemption percentage calculation
4. **Reproducibility** - Added seeds for deterministic training

## Expected Improvements Over Baseline

| Improvement | Impact on Tardiness | Impact on Energy | Confidence |
|-------------|---------------------|------------------|------------|
| **LR Annealing** (3e-4 → 0) | -5% to -10% | ~0% | High |
| **Entropy Decay** (0.0005 → 0.0001) | -3% to -8% | -2% to -5% | Medium |
| **Deeper Network** ([256,256] → [256,256,128]) | -5% to -15% | -3% to -8% | Medium |
| **Larger Batch** (2048 → 4096) | -2% to -5% | ~0% | Medium |
| **More Epochs** (5 → 8) | -3% to -7% | -1% to -3% | Medium |
| **Tighter Clip** (0.2 → 0.15) | -2% to -5% | ~0% | Low |
| **Extras Observation** (queue_len, free_slices) | -5% to -10% | -3% to -7% | Medium |
| **Reward Bug Fix** | -5% to -15% | -5% to -10% | High |

## Overall Prediction

**Expected improvement over baseline:**
- **Tardiness**: **15-30% reduction** (from ~X to ~0.7-0.85X)
- **Energy**: **5-15% reduction** (from ~Y to ~0.85-0.95Y)
- **Late Job Fraction**: **10-25% reduction**

*Note: These are estimates based on typical RL scheduling improvements. Actual results depend on queue characteristics and randomness.*
