In [8]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [9]:
!pip install torch torchvision torchaudio



In [10]:
!pip install torch-geometric torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.0.0+cpu.html

Looking in links: https://data.pyg.org/whl/torch-2.0.0+cpu.html


In [12]:
import random
import networkx as nx
import torch
from torch_geometric.data import HeteroData

def generate_random_workflow(num_tasks=5, num_vms=3):
    G = nx.gn_graph(num_tasks, seed=random.randint(1, 1000))  # Generates DAG

    task_features = []
    task_ids = list(G.nodes)
    for i in task_ids:
        duration = random.randint(5, 20)
        data_size = random.randint(1, 10)
        task_features.append([duration, data_size])

    task_x = torch.tensor(task_features, dtype=torch.float)

    vm_features = []
    for _ in range(num_vms):
        cpu = random.randint(1, 4)
        mem = random.randint(4, 16)
        vm_features.append([cpu, mem])
    
    vm_x = torch.tensor(vm_features, dtype=torch.float)

    # Task dependencies
    edges = list(G.edges)
    edge_index_task = torch.tensor(edges, dtype=torch.long).t().contiguous()

    # Task to VM assignability (all-to-all)
    edge_index_assign = torch.tensor([
        [i for i in range(num_tasks) for _ in range(num_vms)],
        [j for _ in range(num_tasks) for j in range(num_vms)]
    ], dtype=torch.long)

    # Edge attributes (estimated execution time per task-vm pair)
    edge_attr = []
    for task in task_x:
        for vm in vm_x:
            est_time = task[0] / vm[0]  # naive: duration / CPU
            edge_attr.append([est_time.item()])
    edge_attr = torch.tensor(edge_attr, dtype=torch.float)

    # Ground truth: assign each task to the VM with min exec time
    labels = []
    for i, task in enumerate(task_x):
        best_vm = min(range(num_vms), key=lambda j: task[0] / vm_x[j][0])
        labels.append(best_vm)
    y = torch.tensor(labels, dtype=torch.long)

    # Construct hetero graph
    data = HeteroData()
    data['task'].x = task_x
    data['vm'].x = vm_x
    data['task', 'to', 'task'].edge_index = edge_index_task
    data['task', 'assign', 'vm'].edge_index = edge_index_assign
    data['task', 'assign', 'vm'].edge_attr = edge_attr
    data['task'].y = y

    return data


In [13]:
data = generate_random_workflow(num_tasks=6, num_vms=3)
print(data)

HeteroData(
  task={
    x=[6, 2],
    y=[6],
  },
  vm={ x=[3, 2] },
  (task, to, task)={ edge_index=[2, 5] },
  (task, assign, vm)={
    edge_index=[2, 18],
    edge_attr=[18, 1],
  }
)


In [18]:
from torch_geometric.nn import SAGEConv

class SchedulerGNN(nn.Module):
    def __init__(self, hidden_channels):
        super().__init__()
        self.conv1 = HeteroConv({
            ('task', 'to', 'task'): GCNConv(-1, hidden_channels),
            ('task', 'assign', 'vm'): SAGEConv((-1, -1), hidden_channels),
        }, aggr='sum')

        self.lin_task = nn.Linear(hidden_channels, hidden_channels)
        self.out = nn.Linear(hidden_channels, 3)  # 3 VMs

    def forward(self, x_dict, edge_index_dict):
        x_dict = self.conv1(x_dict, edge_index_dict)
        task_repr = self.lin_task(x_dict['task'])
        return self.out(task_repr)

In [37]:
data = generate_random_workflow(num_tasks=6, num_vms=3)
model = SchedulerGNN(hidden_channels=32)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()

for epoch in range(100):
    model.train()
    out = model(data.x_dict, data.edge_index_dict)
    loss = loss_fn(out, data['task'].y)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    if epoch % 10 == 0:
        acc = (out.argmax(dim=1) == data['task'].y).sum().item() / data['task'].y.size(0)
        print(f"Epoch {epoch}, Loss: {loss.item():.4f}, Acc: {acc:.2f}")


Epoch 0, Loss: 2.5107, Acc: 0.00
Epoch 10, Loss: 0.0000, Acc: 1.00
Epoch 20, Loss: 0.0000, Acc: 1.00
Epoch 30, Loss: 0.0000, Acc: 1.00
Epoch 40, Loss: 0.0000, Acc: 1.00
Epoch 50, Loss: 0.0000, Acc: 1.00
Epoch 60, Loss: 0.0000, Acc: 1.00
Epoch 70, Loss: 0.0000, Acc: 1.00
Epoch 80, Loss: 0.0000, Acc: 1.00
Epoch 90, Loss: 0.0000, Acc: 1.00


In [38]:
def simulate_makespan(assignments, durations, vm_count=10):
    vm_time = [0] * vm_count
    for i, vm in enumerate(assignments):
        vm_time[vm] += durations[i]
    return max(vm_time)

durations = [int(t[0].item()) for t in data['task'].x]
pred = model(data.x_dict, data.edge_index_dict).argmax(dim=1).tolist()
true = data['task'].y.tolist()

print("Predicted Makespan:", simulate_makespan(pred, durations))
print("Optimal Makespan  :", simulate_makespan(true, durations))


Predicted Makespan: 65
Optimal Makespan  : 65
