In [None]:
# ===========================
# 1.Requirements
# ===========================
try:
    import numpy as np, pandas as pd, matplotlib, sklearn, simpy, networkx, tqdm, scipy
except ImportError as e:
    !pip install numpy pandas matplotlib scikit-learn scipy simpy networkx tqdm

# Optional
try:
    import xgboost
except ImportError:
    try:
        !pip install xgboost
    except:
        xgboost = None

try:
    import optuna
except ImportError:
    optuna = None

import os, random, json, math, time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx
import simpy
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import SGDRegressor
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
from scipy import stats

Collecting simpy
  Downloading simpy-4.1.1-py3-none-any.whl.metadata (6.1 kB)
Downloading simpy-4.1.1-py3-none-any.whl (27 kB)
Installing collected packages: simpy
Successfully installed simpy-4.1.1


In [None]:
# ===========================
# 2.CONFIG
# ===========================
CONFIG = {
    "SEED": 42,
    "SMOKE_TEST": False,   # Set True for 1-min run
    "NUM_TASKS": 50000,
    "NUM_VMS": 300,
    "RUNTIME_MINUTES": 120,
    "SCHED_TICK_MS": 250,
    "TRAIN_WARMUP_TASKS": 2000,
    "ONLINE_UPDATE_FREQ": 200,
    "RESULTS_DIR": "/content" if os.path.exists("/content") else "./artifacts",
}

if CONFIG["SMOKE_TEST"]:
    CONFIG.update({
        "NUM_TASKS": 2000,
        "NUM_VMS": 20,
        "RUNTIME_MINUTES": 10,
    })

os.makedirs(CONFIG["RESULTS_DIR"], exist_ok=True)
np.random.seed(CONFIG["SEED"])
random.seed(CONFIG["SEED"])

In [None]:
# ===========================
# 3) Synthetic Workload Generator
# ===========================

def generate_vms(num_vms, seed=0, pattern=None):
    np.random.seed(seed)
    vms = []
    for i in range(num_vms):
        if pattern == "heterogeneous":
            cores = np.random.choice([2, 4, 8, 16, 32])
            ghz = max(1.0, np.random.normal(loc=np.random.choice([1.8, 2.5, 3.5]), scale=0.7))
            mem = np.random.choice([2, 4, 8, 16, 32, 64])
            iops = np.random.lognormal(mean=2.5 + np.random.rand(), sigma=0.9)
            arch = np.random.choice(["x86", "ARM", "RISC"])
            loc = np.random.choice(["edge", "cloud"], p=[0.6, 0.4])
        else:
            cores = np.random.choice([2, 4, 8, 16])
            ghz = max(1.0, np.random.normal(loc=2.5, scale=0.5))
            mem = np.random.choice([4, 8, 16, 32])
            iops = np.random.lognormal(mean=3, sigma=0.5)
            arch = np.random.choice(["x86", "ARM"])
            loc = "edge" if np.random.rand() < 0.6 else "cloud"
        vms.append([i, cores, ghz, mem, iops, arch, loc])
    return pd.DataFrame(vms, columns=["vm_id", "cores", "cpu_ghz", "mem_gb", "disk_iops", "arch", "loc"])

def generate_tasks(num_tasks, runtime_minutes, seed=0, pattern=None):
    np.random.seed(seed)
    arrivals = np.cumsum(np.random.poisson(lam=5, size=num_tasks)) % (runtime_minutes*60)
    tasks = []
    types = ["batch", "interactive", "ml"]
    for i in range(num_tasks):
        if pattern == "bursty":
            if i % 500 < 400:
                arrivals[i] += np.random.normal(0, 10)
        # Clamp arrival time >= 0 to prevent simulation errors
        arrivals[i] = max(0, arrivals[i])

        cpu = np.random.lognormal(mean=3, sigma=1)
        mem = np.random.choice([1, 2, 4, 8])
        io = np.clip(np.random.beta(2, 5) + (0.3 if pattern == "heterogeneous" and i % 2 == 0 else 0), 0, 1)
        size = np.random.exponential(scale=50 if pattern != "heterogeneous" else 100)
        deadline = arrivals[i] + np.random.uniform(10, 100) if np.random.rand() < 0.3 else None
        ilp = np.random.uniform(0.5, 2.0)
        ttype = np.random.choice(types)
        tasks.append([i, arrivals[i], cpu, mem, io, size, deadline, ilp, ttype])
    return pd.DataFrame(tasks, columns=["task_id", "arrival", "cpu_req", "mem_req", "io_intensity", "input_mb", "deadline", "ilp", "type"])

# Example for testing:
vms_df = generate_vms(CONFIG["NUM_VMS"], CONFIG["SEED"], pattern="heterogeneous")
tasks_df = generate_tasks(CONFIG["NUM_TASKS"], CONFIG["RUNTIME_MINUTES"], CONFIG["SEED"], pattern="bursty")

print("VMs preview:\n", vms_df.head())
print("Tasks preview:\n", tasks_df.head())

assert len(vms_df)>0 and len(tasks_df)>0


VMs preview:
    vm_id  cores   cpu_ghz  mem_gb  disk_iops  arch    loc
0      0     16  2.180620      32   8.183598  RISC   edge
1      1      8  2.905449      64  25.715729   ARM   edge
2      2     16  2.823665      32  74.947547   x86   edge
3      3      4  4.525954       8  14.574353   x86   edge
4      4      8  3.079822      32  36.002546  RISC  cloud
Tasks preview:
    task_id  arrival    cpu_req  mem_req  io_intensity   input_mb    deadline  \
0        0       11  32.786512        8      0.237605   7.017056         NaN   
1        1       17  43.767978        4      0.582252  22.698222   98.623023   
2        2       12  15.284867        2      0.203930  39.212414         NaN   
3        3        8  16.645953        8      0.106794  15.368639         NaN   
4        4       22   9.997307        1      0.586072   2.476741  119.423401   

        ilp         type  
0  1.412203        batch  
1  1.895989  interactive  
2  1.592084  interactive  
3  1.456554           ml  
4  1.1

In [None]:
# ===============================
# 4) Discrete-Event Simulation
# ===============================

class Task:
    """Represents a submitted task in the system."""
    def __init__(self, row):
        self.task_id = row.task_id
        self.arrival = row.arrival
        self.cpu_req = row.cpu_req
        self.mem_req = row.mem_req
        self.io_intensity = row.io_intensity
        self.input_mb = row.input_mb
        self.deadline = row.deadline
        self.ilp = row.ilp
        self.type = row.type
        # Runtime stats
        self.wait_time = None
        self.start_time = None
        self.finish_time = None
        self.vm_id = None
        self.transfer_time = None
        self.compute_time = None

class VM:
    """Resource model of a VM (single-task capacity for simplicity)."""
    def __init__(self,row):
        self.vm_id = row.vm_id
        self.cores = row.cores
        self.cpu_ghz = row.cpu_ghz
        self.mem_gb = row.mem_gb
        self.disk_iops = row.disk_iops
        self.arch = row.arch
        self.loc = row.loc
        self.busy_until = 0.0

    def is_free(self, now):
        return now >= self.busy_until

class NetworkModel:
    """Simplified network model with dynamic latency and bandwidth."""
    def __init__(self, seed=0):
        np.random.seed(seed)
        self.base_latency = {"edge":10, "cloud":50}
        self.base_bw = {"edge":100, "cloud":1000} # Mbps

    def transfer_time(self, src_loc, dst_loc, mb, now):
        lat = self.base_latency[dst_loc] * (1 + 0.1*np.sin(now/600))
        bw = self.base_bw[dst_loc] * (0.8+0.4*np.sin(now/300))
        bw = max(10,bw)
        return lat/1000 + (mb*8)/bw # latency + transmission

class Simulator:
    """SimPy-based simulator with scheduler plug-in."""
    def __init__(self, tasks_df, vms_df, scheduler_cls, config):
        self.env = simpy.Environment()
        self.tasks = [Task(r) for r in tasks_df.itertuples()]
        self.vms = {r.vm_id:VM(r) for r in vms_df.itertuples()}
        self.scheduler = scheduler_cls(self, config)
        self.config = config
        self.now = 0
        self.queue=[]
        self.completed=[]
        self.net_model = NetworkModel(config['SEED'])

    def run(self):
        # Task arrivals
        for task in self.tasks:
            self.env.process(self.task_arrival(task))
        # Scheduler loop
        self.env.process(self.schedule_loop())
        self.env.run(until=self.config["RUNTIME_MINUTES"]*60)

    def task_arrival(self, task):
        yield self.env.timeout(task.arrival)
        self.queue.append(task)

    def schedule_loop(self):
        while True:
            yield self.env.timeout(self.config["SCHED_TICK_MS"]/1000)
            self.now=self.env.now
            self.scheduler.schedule(self.now)

    def start_task(self,task,vm,now):
        """Allocate task to VM if free and schedule its completion."""
        if not vm.is_free(now):
            return False
        transfer = self.net_model.transfer_time("edge", vm.loc, task.input_mb, now)
        # Hidden ground-truth runtime function
        base = task.cpu_req/(vm.cores*vm.cpu_ghz*task.ilp)
        noise = np.random.lognormal(mean=0, sigma=0.25)
        runtime = base*noise*(1+0.1*task.io_intensity)
        task.wait_time = now-task.arrival
        task.start_time = now+transfer
        task.transfer_time=transfer
        task.compute_time=runtime
        task.vm_id=vm.vm_id
        task.finish_time = task.start_time+runtime
        vm.busy_until = task.finish_time
        self.completed.append(task)
        return True

# Sanity test
test_sim = Simulator(tasks_df.head(1000), vms_df.head(100), scheduler_cls=lambda sim, cfg: None, config=CONFIG)
print("Simulator initialized with", len(test_sim.tasks), "tasks and", len(test_sim.vms), "VMs")
assert isinstance(test_sim.tasks[0], Task)
assert isinstance(test_sim.vms[0], VM)

Simulator initialized with 1000 tasks and 100 VMs


In [None]:
# ===============================
# 5) Baseline Schedulers
# ===============================

class BaseScheduler:
    """Abstract scheduler interface."""
    def __init__(self, sim:Simulator, config):
        self.sim=sim
        self.config=config

    def schedule(self, now):
        raise NotImplementedError

class RandomScheduler(BaseScheduler):
    def schedule(self, now):
        for task in list(self.sim.queue):
            free_vms=[vm for vm in self.sim.vms.values() if vm.is_free(now)]
            if not free_vms: return
            vm=random.choice(free_vms)
            if self.sim.start_task(task,vm,now):
                self.sim.queue.remove(task)

class FCFS(BaseScheduler):
    def schedule(self,now):
        if not self.sim.queue: return
        free_vms=[vm for vm in self.sim.vms.values() if vm.is_free(now)]
        for vm in free_vms:
            if not self.sim.queue: return
            task=self.sim.queue[0]
            if self.sim.start_task(task,vm,now):
                self.sim.queue.remove(task)

class RoundRobin(BaseScheduler):
    def __init__(self,sim,config):
        super().__init__(sim,config)
        self.idx=0
        self.vm_ids=list(sim.vms.keys())
    def schedule(self,now):
        for task in list(self.sim.queue):
            for _ in range(len(self.vm_ids)):
                vm=self.sim.vms[self.vm_ids[self.idx]]
                self.idx=(self.idx+1)%len(self.vm_ids)
                if vm.is_free(now):
                    if self.sim.start_task(task,vm,now):
                        self.sim.queue.remove(task)
                        break

class MinMin(BaseScheduler):
    def schedule(self,now):
        tasks=list(self.sim.queue)
        free_vms=[vm for vm in self.sim.vms.values() if vm.is_free(now)]
        for t in tasks:
            if not free_vms: return
            est=[(vm, t.cpu_req/(vm.cores*vm.cpu_ghz*t.ilp)) for vm in free_vms]
            vm=min(est,key=lambda x:x[1])[0]
            if self.sim.start_task(t,vm,now):
                self.sim.queue.remove(t)

class MaxMin(BaseScheduler):
    def schedule(self,now):
        tasks=list(self.sim.queue)
        free_vms=[vm for vm in self.sim.vms.values() if vm.is_free(now)]
        for t in tasks:
            if not free_vms: return
            est=[(vm, t.cpu_req/(vm.cores*vm.cpu_ghz*t.ilp)) for vm in free_vms]
            vm=max(est,key=lambda x:x[1])[0]
            if self.sim.start_task(t,vm,now):
                self.sim.queue.remove(t)

# Sanity check
test_sim = Simulator(tasks_df.head(50), vms_df.head(5), scheduler_cls=RandomScheduler, config=CONFIG)
test_sim.run()
print("Completed", len(test_sim.completed),"tasks in quick sanity sim")
assert len(test_sim.completed)>0


Completed 50 tasks in quick sanity sim


In [None]:
# ===============================
# 6) Hybrid Predictive Scheduler
# ===============================


class HybridScheduler(BaseScheduler):
    def __init__(self, sim, config):
        super().__init__(sim, config)
        self.scaler = StandardScaler()
        self.model = GradientBoostingRegressor(
            n_estimators=120, max_depth=4, learning_rate=0.07, random_state=config["SEED"]
        )
        self.history_X = []
        self.history_y = []
        self.is_model_ready = False
        self.warmup_count = config.get("TRAIN_WARMUP_TASKS", 1000)
        self.online_update_freq = config.get("ONLINE_UPDATE_FREQ", 100)

    # Robust feature extractor - cleans NaNs/infs
    def feature_vector(self, task, vm, now):
        cpu_util = task.cpu_req / (vm.cores * vm.cpu_ghz + 1e-6)
        mem_util = task.mem_req / (vm.mem_gb + 1e-6)
        io_vs_disk = task.io_intensity / (vm.disk_iops + 1e-6)
        input_log = np.log1p(task.input_mb)
        tod = (now % 86400) / 86400.0
        deadline_feat = (task.deadline - now) if (task.deadline is not None and not np.isnan(task.deadline)) else -1
        features = np.array([
            task.cpu_req, task.mem_req, task.io_intensity, input_log,
            int(task.type=="ml"), int(task.type=="batch"), int(task.type=="interactive"),
            task.ilp,
            vm.cores, vm.cpu_ghz, vm.mem_gb, vm.disk_iops,
            int(vm.arch=="arm"), int(vm.loc=="cloud"),
            cpu_util, mem_util, io_vs_disk, tod, np.sin(2*np.pi*tod), np.cos(2*np.pi*tod),
            deadline_feat
        ])
        features = np.nan_to_num(features, nan=-1, posinf=-1, neginf=-1)
        return features

    def schedule(self, now):
        free_vms = [vm for vm in self.sim.vms.values() if vm.is_free(now)]
        if not self.sim.queue or not free_vms:
            return

        for task in list(self.sim.queue):
            if not self.is_model_ready:
                est = [
                    (vm, task.cpu_req / (vm.cores * vm.cpu_ghz * (task.ilp if task.ilp > 0 else 1)))
                    for vm in free_vms
                ]
                vm = min(est, key=lambda x: x[1])[0]
                if self.sim.start_task(task, vm, now):
                    self.sim.queue.remove(task)
                    X = self.feature_vector(task, vm, now)
                    y = task.compute_time
                    if not np.isnan(X).any() and not np.isnan(y):
                        self.history_X.append(X)
                        self.history_y.append(y)
                if len(self.history_X) >= self.warmup_count and not self.is_model_ready:
                    Xarr = np.vstack(self.history_X)
                    yarr = np.array(self.history_y)
                    valid_rows = ~np.isnan(Xarr).any(axis=1) & ~np.isnan(yarr)
                    Xarr = Xarr[valid_rows]
                    yarr = yarr[valid_rows]
                    self.scaler.fit(Xarr)
                    self.model.fit(self.scaler.transform(Xarr), yarr)
                    self.is_model_ready = True
                    print(f"[HybridScheduler] Model trained at t={now} with {len(Xarr)} samples.")
                continue

            preds = []
            for vm in free_vms:
                features = self.feature_vector(task, vm, now)
                if np.isnan(features).any():
                    continue
                try:
                    X_scaled = self.scaler.transform([features])
                    pred = self.model.predict(X_scaled)[0]
                except Exception as e:
                    print(f"Prediction error: {e}")
                    pred = 1e9
                transfer = self.sim.net_model.transfer_time("edge", vm.loc, task.input_mb, now)
                total_time = pred + transfer
                preds.append((vm, total_time))
            if not preds:
                continue
            vm = min(preds, key=lambda p: p[1])[0]
            if self.sim.start_task(task, vm, now):
                self.sim.queue.remove(task)
                X = self.feature_vector(task, vm, now)
                y = task.compute_time
                if not np.isnan(X).any() and not np.isnan(y):
                    self.history_X.append(X)
                    self.history_y.append(y)
                if len(self.history_X) % self.online_update_freq == 0:
                    Xarr = np.vstack(self.history_X)
                    yarr = np.array(self.history_y)
                    valid_rows = ~np.isnan(Xarr).any(axis=1) & ~np.isnan(yarr)
                    Xarr = Xarr[valid_rows]
                    yarr = yarr[valid_rows]
                    self.scaler.fit(Xarr)
                    self.model.fit(self.scaler.transform(Xarr), yarr)
                    print(f"[HybridScheduler] Online update at t={now} ({len(Xarr)} samples)")



In [None]:
# ===============================
# 7) Training & Online Learning Diagnostics (Fixed)
# ===============================
def compute_prediction_error(history_X, history_y, model, scaler):
    if len(history_X) < 50:
        return None
    X = np.vstack(history_X)
    y = np.array(history_y)
    valid_rows = ~np.isnan(X).any(axis=1) & ~np.isnan(y)
    X = X[valid_rows]
    y = y[valid_rows]
    if len(X) == 0 or len(y) == 0:
        return None
    Xs = scaler.transform(X)
    yhat = model.predict(Xs)
    mae = mean_absolute_error(y, yhat)
    mape = mean_absolute_percentage_error(y, yhat)
    return mae, mape

def stable_compute_learning_curve(hybrid_scheduler, outdir, model_params=None):
    errs = []
    winsize = 200
    n = len(hybrid_scheduler.history_X)
    for i in range(winsize, n, winsize):
        try:
            X = np.vstack(hybrid_scheduler.history_X[:i])
            y = np.array(hybrid_scheduler.history_y[:i])
            valid_rows = ~np.isnan(X).any(axis=1) & ~np.isnan(y)
            X = X[valid_rows]
            y = y[valid_rows]
            if len(X) < 50:
                continue
            scaler = StandardScaler()
            scaler.fit(X)
            if model_params is None:
                model = GradientBoostingRegressor(n_estimators=120, max_depth=4, learning_rate=0.07)
            else:
                model = GradientBoostingRegressor(**model_params)
            model.fit(scaler.transform(X), y)
            yhat = model.predict(scaler.transform(X))
            mae = mean_absolute_error(y, yhat)
            mape = mean_absolute_percentage_error(y, yhat)
            errs.append((i, mae, mape))
        except Exception as e:
            print(f"Diagnostic fitting failed at {i} samples: {e}")
    errs = pd.DataFrame(errs, columns=["num", "mae", "mape"])
    if errs.empty:
        print("No diagnostics computed.")
    else:
        plt.figure(figsize=(6, 4))
        plt.plot(errs["num"], errs["mae"], label="MAE")
        plt.plot(errs["num"], errs["mape"], label="MAPE")
        plt.xlabel("Training samples")
        plt.ylabel("Error")
        plt.legend()
        plt.title("Prediction Error over Online Training")
        plt.tight_layout()
        plt.savefig(os.path.join(outdir, "learning_curve.png"))
        plt.close()
    return errs


In [None]:
# ===============================
# 8) Evaluation Protocol
# ===============================
SCENARIOS = {
    "stationary": {
        "NUM_TASKS": 3000, "NUM_VMS": 50, "RUNTIME_MINUTES": 30,
        "PATTERN": "stationary"
    },
    "bursty": {
        "NUM_TASKS": 3000, "NUM_VMS": 50, "RUNTIME_MINUTES": 30,
        "PATTERN": "bursty"
    },
    "heterogeneous": {
        "NUM_TASKS": 3000, "NUM_VMS": 50, "RUNTIME_MINUTES": 30,
        "PATTERN": "heterogeneous"
    }
}
SCHEDULERS = {
    "Random": RandomScheduler,
    "FCFS": FCFS,
    "RoundRobin": RoundRobin,
    "MinMin": MinMin,
    "MaxMin": MaxMin,
    "Proposed Method": HybridScheduler,
}


def run_experiment(scenario_name, config, seed):
    cfg = config.copy()
    cfg.update(SCENARIOS[scenario_name])
    cfg["SEED"] = seed + hash(scenario_name) % 10000  # Unique seed per scenario
    np.random.seed(cfg["SEED"])
    random.seed(cfg["SEED"])
    tasks = generate_tasks(cfg["NUM_TASKS"], cfg["RUNTIME_MINUTES"], cfg["SEED"], pattern=cfg.get("PATTERN"))
    vms = generate_vms(cfg["NUM_VMS"], cfg["SEED"], pattern=cfg.get("PATTERN"))
    results = []
    for sname, Scls in SCHEDULERS.items():
        sim = Simulator(tasks, vms, Scls, cfg)
        sim.run()
        completion = [t.finish_time - t.arrival for t in sim.completed if t.finish_time]
        act = np.mean(completion)
        makespan = max([t.finish_time for t in sim.completed]) - min([t.start_time for t in sim.completed])
        p95 = np.percentile(completion, 95)
        deadline_miss = np.mean([1 if t.deadline and t.finish_time > t.deadline else 0 for t in sim.completed])
        results.append([scenario_name, sname, act, makespan, p95, deadline_miss])
    return pd.DataFrame(results, columns=["scenario", "scheduler", "ACT", "makespan", "p95", "deadline_miss"])

# Sanity: small run
res=run_experiment("heterogeneous",CONFIG,seed=5)
print(res)
assert not res.empty


[HybridScheduler] Model trained at t=1181.5 with 2000 samples.
[HybridScheduler] Online update at t=1309.25 (2200 samples)
[HybridScheduler] Online update at t=1431.0 (2400 samples)
[HybridScheduler] Online update at t=1548.25 (2600 samples)
[HybridScheduler] Online update at t=1674.5 (2800 samples)
[HybridScheduler] Online update at t=1799.0 (3000 samples)
        scenario        scheduler       ACT     makespan        p95  \
0  heterogeneous           Random  7.466872  1813.648103  27.575907   
1  heterogeneous             FCFS  4.395901  1854.014036  17.857155   
2  heterogeneous       RoundRobin  7.518895  1854.151337  27.331590   
3  heterogeneous           MinMin  2.439951  1853.883388   6.789912   
4  heterogeneous           MaxMin  9.865537  1833.374952  31.772320   
5  heterogeneous  Proposed Method  2.158059  1801.233032   5.630134   

   deadline_miss  
0       0.009667  
1       0.004000  
2       0.008333  
3       0.000667  
4       0.011000  
5       0.000000  


In [None]:
# ===============================
# 9) Batch Experiments Across Scenarios
# ===============================

all_results=[]
for sc in SCENARIOS:
    for seed in range(5): # keep small for demo; in full run use 5 seeds
        df=run_experiment(sc,CONFIG,seed)
        all_results.append(df)
all_results=pd.concat(all_results,ignore_index=True)

agg=all_results.groupby(["scenario","scheduler"]).agg(["mean","std"])
print(agg)




[HybridScheduler] Model trained at t=1168.0 with 2000 samples.
[HybridScheduler] Online update at t=1287.0 (2200 samples)
[HybridScheduler] Online update at t=1422.0 (2400 samples)
[HybridScheduler] Online update at t=1551.0 (2600 samples)
[HybridScheduler] Online update at t=1673.25 (2800 samples)
[HybridScheduler] Online update at t=1799.75 (3000 samples)
[HybridScheduler] Model trained at t=1178.25 with 2000 samples.
[HybridScheduler] Online update at t=1303.0 (2200 samples)
[HybridScheduler] Online update at t=1426.0 (2400 samples)
[HybridScheduler] Online update at t=1552.0 (2600 samples)
[HybridScheduler] Online update at t=1676.0 (2800 samples)
[HybridScheduler] Online update at t=1799.25 (3000 samples)
[HybridScheduler] Model trained at t=1187.0 with 2000 samples.
[HybridScheduler] Online update at t=1301.0 (2200 samples)
[HybridScheduler] Online update at t=1424.0 (2400 samples)
[HybridScheduler] Online update at t=1544.25 (2600 samples)
[HybridScheduler] Online update at t=16

In [None]:
import matplotlib.pyplot as plt

# List the baseline schedulers, with "Proposed Method" to appear last
base_schedulers = ["FCFS", "RoundRobin", "Random", "MinMin", "MaxMin"]
proposed_scheduler = "Proposed Method"
scheduler_order = base_schedulers + [proposed_scheduler]

label_map = {x: x for x in base_schedulers}
label_map[proposed_scheduler] = "Proposed Method"

metrics = ["ACT", "p95", "deadline_miss", "makespan"]

for sc in agg.index.get_level_values('scenario').unique():
    sub = agg.loc[sc]
    # Find all actually present schedulers for this scenario
    present = list(sub.index)
    ordered = [x for x in base_schedulers if x in present]
    if proposed_scheduler in present:
        ordered.append(proposed_scheduler)  # ensure last
    # Now plot for each metric
    for metric in metrics:
        means = sub.loc[ordered][metric, 'mean']
        stds = sub.loc[ordered][metric, 'std']
        means.index = [label_map[x] for x in means.index]
        plt.figure(figsize=(6, 4))
        means.plot(kind="bar", yerr=stds, alpha=0.7)
        plt.ylabel(metric)
        plt.title(f"{sc} - {metric}")
        plt.tight_layout()
        fname = f"{sc}_{metric}.png"
        plt.savefig(os.path.join(CONFIG["RESULTS_DIR"], fname))
        plt.close()


In [None]:
    from google.colab import files
    import shutil

In [None]:
    shutil.make_archive("/content", 'zip', "/content")
    files.download("/content.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# ===============================
# 10) Statistical Significance
# ===============================

def significance_tests(all_results,baseline="Random",target="Hybrid"):
    out=[]
    for sc in SCENARIOS:
        sub=all_results[all_results.scenario==sc]
        for metric in ["ACT","p95","deadline_miss"]:
            base=sub[sub.scheduler==baseline][metric].values
            tgt=sub[sub.scheduler==target][metric].values
            if len(base)!=len(tgt): continue
            # normality
            _,p_norm=stats.shapiro(tgt-base)
            if p_norm>0.05:
                t,p=stats.ttest_rel(tgt,base)
                test="t-test"
            else:
                t,p=stats.wilcoxon(tgt,base)
                test="wilcoxon"
            eff=(np.mean(tgt)-np.mean(base))/np.std(base)
            out.append([sc,metric,test,p,eff])
    return pd.DataFrame(out,columns=["scenario","metric","test","pval","effect"])

sig=significance_tests(all_results)
print(sig)
sig.to_csv(os.path.join(CONFIG["RESULTS_DIR"],"table_significance.csv"),index=False)

# LaTeX table
with open(os.path.join(CONFIG["RESULTS_DIR"],"table_significance.tex"),"w") as f:
    f.write(sig.to_latex(index=False,float_format="%.3f"))


In [None]:
# ===============================
# 11) Ablation & Robustness
# ===============================

class HybridNoCluster(HybridScheduler):
    def cluster_vms(self):  # override to disable clustering: all VMs one cluster
        return {vm_id: 0 for vm_id in self.sim.vms.keys()}

class ClusterGreedyScheduler(BaseScheduler):
    def __init__(self, sim, config):
        super().__init__(sim, config)
        self.vm_clusters = HybridScheduler.cluster_vms(self)
    def schedule(self, now):
        free_vms = [vm for vm in self.sim.vms.values() if vm.is_free(now)]
        if not free_vms or not self.sim.queue:
            return
        for task in list(self.sim.queue):
            # Pick VM cluster via simple heuristic
            vm_cluster_counts = {}
            for vm in free_vms:
                c = self.vm_clusters[vm.vm_id]
                vm_cluster_counts[c] = vm_cluster_counts.get(c, 0) + 1
            best_cluster = max(vm_cluster_counts, key=vm_cluster_counts.get)
            candidate_vms = [vm for vm in free_vms if self.vm_clusters[vm.vm_id] == best_cluster]
            if candidate_vms:
                vm = random.choice(candidate_vms)
                if self.sim.start_task(task, vm, now):
                    self.sim.queue.remove(task)

class HybridNoUpdates(HybridScheduler):
    # Offline only: disables online model refit
    def schedule(self, now):
        free_vms = [vm for vm in self.sim.vms.values() if vm.is_free(now)]
        if not free_vms or not self.sim.queue:
            return
        if self.warmup:
            # Warmup same as original
            super().schedule(now)
        else:
            for task in list(self.sim.queue):
                preds = []
                for vm in free_vms:
                    X = self.feature_vector(task, vm, now).reshape(1, -1)
                    XT = self.scaler.transform(X)
                    yhat = self.model.predict(XT)[0]
                    preds.append((vm, yhat))
                vm = min(preds, key=lambda p: p[1])[0]
                if self.sim.start_task(task, vm, now):
                    self.sim.queue.remove(task)
                    # No online update here

class HybridLinearSGD(HybridScheduler):
    def __init__(self, sim, config):
        super().__init__(sim, config)
        self.model = SGDRegressor(max_iter=1000, tol=1e-3, random_state=config["SEED"])
        self.warmup = True

# Robustness: Inject sudden latency increase and VM throttling

def inject_concept_drift(sim: Simulator, drift_time=900):
    """At drift_time (seconds), degrade network and reduce VM perf."""
    def drift(env):
        yield env.timeout(drift_time)
        sim.net_model.base_latency = {k: v*3 for k, v in sim.net_model.base_latency.items()}
        for vm in sim.vms.values():
            vm.cpu_ghz *= 0.5  # simulate throttling
        print(f"[{env.now:.2f}] Concept drift injected: latency x3, VM CPU halved")
    sim.env.process(drift(sim.env))

# Ablation experiments run function

def run_ablation_experiments(tasks_df, vms_df, config, seeds=[1,2]):
    ablations = {
        "HybridNoCluster": HybridNoCluster,
        "ClusterGreedy": ClusterGreedyScheduler,
        "HybridNoUpdates": HybridNoUpdates,
        "HybridLinearSGD": HybridLinearSGD,
        "Hybrid": HybridScheduler,
    }
    results = []
    for seed in seeds:
        np.random.seed(seed)
        random.seed(seed)
        for name, cls in ablations.items():
            sim = Simulator(tasks_df, vms_df, cls, config)
            # Inject concept drift in robustness scenario
            if config.get("ROBUSTNESS", False):
                inject_concept_drift(sim)
            sim.run()
            completion = [t.finish_time - t.arrival for t in sim.completed if t.finish_time]
            act = np.mean(completion)
            deadline_miss = np.mean([1 if t.deadline and t.finish_time > t.deadline else 0 for t in sim.completed])
            results.append([seed, name, act, deadline_miss])
    return pd.DataFrame(results, columns=["seed", "scheduler", "ACT", "deadline_miss"])

# Example robustness run (small scale)
robust_cfg = CONFIG.copy()
robust_cfg.update({"NUM_TASKS":2000, "NUM_VMS":30, "RUNTIME_MINUTES":20, "ROBUSTNESS": True})

ablation_df = run_ablation_experiments(tasks_df.head(robust_cfg["NUM_TASKS"]),
                                      vms_df.head(robust_cfg["NUM_VMS"]),
                                      robust_cfg)
print(ablation_df.groupby("scheduler")[["ACT", "deadline_miss"]].mean())


In [None]:
# ===============================
# 12) Complexity & Throughput (Fixed)
# ===============================

import timeit

def scheduler_latency_benchmark(scheduler_cls, tasks_df, vms_df, config, n_trials=5):
    times = []
    for _ in range(n_trials):
        sim = Simulator(tasks_df, vms_df, scheduler_cls, config)
        now = 0.0  # Starting simulated time
        # Call schedule but do NOT step the SimPy env. We only want to time the decision function
        start = timeit.default_timer()
        sim.scheduler.schedule(now)
        duration = (timeit.default_timer() - start) * 1000  # ms
        times.append(duration)
    return np.mean(times), np.std(times)


benchmark_results = {}
for sched_name, sched_cls in [("Hybrid", HybridScheduler), ("FCFS", FCFS), ("RoundRobin", RoundRobin)]:
    mean_t, std_t = scheduler_latency_benchmark(
        sched_cls,
        tasks_df.head(500), vms_df.head(50), CONFIG)
    benchmark_results[sched_name] = (mean_t, std_t)

print("Scheduler decision latency (ms):", benchmark_results)

# Display complexity notes as markdown

from IPython.display import Markdown, display

display(Markdown(r"""
## Time Complexity of Core Scheduling Steps

- **Clustering lookup:** \(O(1)\) per VM after pre-computed labels
- **Candidate VM filtering:** \(O(|V|)\) per scheduling tick
- **Model inference:** \(O(|V| \cdot f)\) where \(f\) = feature dimensions (small)
- **Candidate scoring & selection:** \(O(|V|)\)

Overall complexity scales linearly with the number of VMs per scheduling tick, suitable for real-time scheduling in hybrid environments.
"""))


In [None]:
# ===============================
# 13) Reproducibility & Artifacts
# ===============================

import json

# Re-set global seeds for reproducibility
np.random.seed(CONFIG["SEED"])
random.seed(CONFIG["SEED"])

def save_df(df, filename):
    path = os.path.join(CONFIG["RESULTS_DIR"], filename)
    df.to_csv(path, index=False)
    print(f"Saved CSV: {path}")

def save_json(data, filename):
    path = os.path.join(CONFIG["RESULTS_DIR"], filename)
    with open(path, "w") as f:
        json.dump(data, f, indent=2)
    print(f"Saved JSON: {path}")

def save_fig(fig, filename):
    path_png = os.path.join(CONFIG["RESULTS_DIR"], filename + ".png")
    path_pdf = os.path.join(CONFIG["RESULTS_DIR"], filename + ".pdf")
    fig.savefig(path_png, dpi=300)
    fig.savefig(path_pdf)
    print(f"Saved figures: {path_png} and {path_pdf}")

# Save config JSON for experiment metadata
save_json(CONFIG, "config.json")

# Placeholders for saving example dataframe and figures from prior experiments (to be called after experiment runs)
# save_df(all_results, "aggregate_results.csv")
# save_fig(plt.gcf(), "example_plot")



In [None]:
# ===============================
# API / Code Structure (Classes with docstrings & type hints)
# ===============================

from typing import List, Dict, Optional

class Task:
    """Representation of a submitted compute task.

    Attributes:
        task_id: Unique task identifier.
        arrival: Arrival time in seconds.
        cpu_req: Required CPU cycles.
        mem_req: Memory requirement in GB.
        io_intensity: IO intensity [0,1].
        input_mb: Input data size in MB.
        deadline: Optional deadline time.
        ilp: Instruction-level parallelism factor.
        type: Task type label e.g., batch, interactive, ml.
        wait_time: Time spent waiting in queue.
        start_time: Actual start time of execution.
        finish_time: Completion time.
        vm_id: Assigned VM.
    """
    def __init__(self, data: pd.Series):
        self.task_id: int = data.task_id
        self.arrival: float = data.arrival
        self.cpu_req: float = data.cpu_req
        self.mem_req: float = data.mem_req
        self.io_intensity: float = data.io_intensity
        self.input_mb: float = data.input_mb
        self.deadline: Optional[float] = data.deadline
        self.ilp: float = data.ilp
        self.type: str = data.type

        self.wait_time: Optional[float] = None
        self.start_time: Optional[float] = None
        self.finish_time: Optional[float] = None
        self.vm_id: Optional[int] = None

class VM:
    """Representation of a Virtual Machine resource.

    Attributes:
        vm_id: Unique VM identifier.
        cores: Number of CPU cores.
        cpu_ghz: CPU frequency GHz.
        mem_gb: Memory in GB.
        disk_iops: Disk IO operations per second.
        arch: Architecture flag (x86/ARM).
        loc: Location ('edge' or 'cloud').
        busy_until: Time until which VM is busy.
    """
    def __init__(self, data: pd.Series):
        self.vm_id: int = data.vm_id
        self.cores: int = data.cores
        self.cpu_ghz: float = data.cpu_ghz
        self.mem_gb: int = data.mem_gb
        self.disk_iops: float = data.disk_iops
        self.arch: str = data.arch
        self.loc: str = data.loc

        self.busy_until: float = 0.0

    def is_free(self, now: float) -> bool:
        return now >= self.busy_until

# Other classes (NetworkModel, Simulator, HybridScheduler, BaselineSchedulers) similarly to previous cells,
# with added type hints and docstrings.


In [None]:
# ===============================
#Final Cell: Reproduce All Experiments
# ===============================

def run_all_experiments(config, scenarios, schedulers, seeds=[42, 43, 44, 45, 46]):
    all_results = []
    for scenario_name in scenarios:
        print(f"=== Running scenario: {scenario_name} ===")
        sc_cfg = config.copy()
        sc_cfg.update(scenarios[scenario_name])
        for seed in seeds:
            print(f"Running seed {seed}...")
            np.random.seed(seed)
            random.seed(seed)
            tasks = generate_tasks(sc_cfg["NUM_TASKS"], sc_cfg["RUNTIME_MINUTES"], seed)
            vms = generate_vms(sc_cfg["NUM_VMS"], seed)

            for sched_name, sched_cls in schedulers.items():
                print(f"  Scheduler: {sched_name}")
                sim = Simulator(tasks, vms, sched_cls, sc_cfg)
                sim.run()

                # Compute metrics
                completion_times = [t.finish_time - t.arrival for t in sim.completed if t.finish_time]
                queue_waits = [t.wait_time for t in sim.completed if t.wait_time is not None]
                deadline_miss_rate = np.mean([1 if (t.deadline and t.finish_time > t.deadline) else 0 for t in sim.completed])
                makespan = max([t.finish_time for t in sim.completed]) - min([t.start_time for t in sim.completed])
                act = np.mean(completion_times)
                p95 = np.percentile(completion_times, 95)
                mean_queue_wait = np.mean(queue_waits) if queue_waits else 0

                # Cost proxy: cloud VMs cost more
                cloud_busy_time = sum(t.compute_time for t in sim.completed if sim.vms[t.vm_id].loc == "cloud")
                edge_busy_time = sum(t.compute_time for t in sim.completed if sim.vms[t.vm_id].loc == "edge")
                cost = cloud_busy_time * 1.5 + edge_busy_time * 1.0  # Arbitrary cost weights

                # Energy proxy: watts * busy time (simplified)
                energy = cloud_busy_time * 120 + edge_busy_time * 50  # watts

                # Save task-level records CSV
                records = []
                for t in sim.completed:
                    records.append({
                        "task_id": t.task_id,
                        "scheduler": sched_name,
                        "scenario": scenario_name,
                        "seed": seed,
                        "wait_time": t.wait_time,
                        "start_time": t.start_time,
                        "finish_time": t.finish_time,
                        "vm_id": t.vm_id,
                        "vm_loc": sim.vms[t.vm_id].loc,
                        "transfer_time": t.transfer_time,
                        "compute_time": t.compute_time,
                        "deadline": t.deadline,
                    })
                df_tasks = pd.DataFrame(records)
                df_tasks.to_csv(os.path.join(config["RESULTS_DIR"],
                                             f"results_{scenario_name}_{sched_name}_seed{seed}.csv"), index=False)

                # Accumulate summary results
                all_results.append({
                    "scenario": scenario_name, "scheduler": sched_name, "seed": seed,
                    "ACT": act, "Makespan": makespan, "P95": p95,
                    "DeadlineMissRate": deadline_miss_rate,
                    "MeanQueueWait": mean_queue_wait, "CostProxy": cost, "EnergyProxy": energy
                })

            print(f"Completed seed {seed} for scenario {scenario_name}.\n")

    df_all = pd.DataFrame(all_results)
    df_all.to_csv(os.path.join(config["RESULTS_DIR"], "aggregate_results.csv"), index=False)
    print("Saved aggregate results CSV.")
    return df_all


# Run all experiments with default seeds and scenarios
df_experiments = run_all_experiments(CONFIG, SCENARIOS, SCHEDULERS)

# Produce summary bar plot examples (ACT per scheduler per scenario)
import matplotlib.pyplot as plt
for sc in SCENARIOS.keys():
    df_sc = df_experiments[df_experiments.scenario == sc]
    means = df_sc.groupby("scheduler")["ACT"].mean()
    stds = df_sc.groupby("scheduler")["ACT"].std()
    plt.figure(figsize=(8,4))
    means.plot(kind="bar", yerr=stds, capsize=4, title=f"Average Completion Time for {sc} scenario")
    plt.ylabel("Seconds")
    plt.tight_layout()
    plt.savefig(os.path.join(CONFIG["RESULTS_DIR"], f"bar_act_{sc}.png"))
    plt.close()

print("All experiments completed, figures saved to:", CONFIG["RESULTS_DIR"])


In [None]:
    !zip -r /content/my_files.zip /content/
    from google.colab import files
    files.download('/content/my_files.zip')