In [101]:
from copy import copy
import gym
import gym_dagsched
import numpy as np
import networkx as nx
from gym.spaces import Dict, Tuple, MultiBinary, Discrete, Box
from dataclasses import asdict
import plotly.express as px
import pandas as pd

env = gym.make(
    'dagsched-v0',
    max_jobs=10,        
    max_stages=8,      
    max_tasks=4,       
    n_worker_types=3,
    n_workers=8
)

In [102]:
obs = env.reset()
done = False

while not done:    
    frontier_stages = obs.get_frontier_stages()
    avail_workers = obs.find_available_workers()
    
    if len(avail_workers) == 0 or len(frontier_stages) == 0:
        if len(avail_workers) == 0:
            print('no available workers')
        else:
            print('frontier is empty')
        action = {
            'job_id': env.max_jobs,
            'stage_id': env.max_stages,
            'worker_type_counts': np.zeros(env.n_worker_types)
        }
    else:
        for stage in frontier_stages:
            for worker in avail_workers:
                if worker.compatible_with(stage):
                    first_stage = stage
        
        stage_id = first_stage.id_
        job_id = first_stage.job_id
        
        avail_counts = obs.get_avail_worker_counts(env.n_worker_types)
        
        stage = obs.jobs[job_id].stages[stage_id]
        worker_types = stage.compatible_worker_types()
        
        n_workers = 0
        worker_type_counts = np.zeros(env.n_worker_types, dtype=int)
        for worker_type in worker_types:
            if n_workers == stage.n_remaining_tasks:
                break
            avail_count = avail_counts[worker_type]
            request_count = int(min(stage.n_remaining_tasks - n_workers, avail_count))
            worker_type_counts[worker_type] = request_count
            n_workers += request_count
        
        print(f'n_workers={n_workers}')

        action = {
            'job_id': job_id,
            'stage_id': stage_id,
            'worker_type_counts': worker_type_counts
        }
    
    print()
    prev_obs = copy(obs)
    obs, _, done, _ = env.step(action)

frontier is empty

invalid action
2.7930908280275397: job arrival
n_workers=2

2.7930908203125: nudge
n_workers=1

16.119292034443657: job arrival
n_workers=1

19.2528289939288: job arrival
n_workers=4

19.939924240112305: task completion (0,5,0)
n_workers=1

22.446532947170795: job arrival
no available workers

invalid action
22.818565368652344: task completion (0,6,0)
n_workers=1

22.818565368652344: task completion (0,6,1)
stage completion
n_workers=1

32.37398919572022: job arrival
no available workers

invalid action
37.644195556640625: task completion (1,6,0)
stage completion
n_workers=1

40.47534942626953: task completion (2,3,0)
n_workers=1

40.47534942626953: task completion (2,3,1)
n_workers=1

40.47534942626953: task completion (2,3,2)
n_workers=1

40.47534942626953: task completion (2,3,3)
stage completion
n_workers=1

42.26740646362305: task completion (1,5,0)
stage completion
n_workers=1

43.02341079711914: task completion (3,4,0)
n_workers=1

43.02341079711914: task comp

In [103]:
env.observation_space.contains(asdict(prev_obs))

True

In [104]:
tasks = []
for job in prev_obs.jobs:
    for stage in job.stages:
        for i,task in enumerate(stage.tasks):
            task_dict = {
                'worker_id': task.worker_id,
                't_accepted': task.t_accepted[0],
                't_completed': task.t_completed[0],
                'job': str(job.id_)
            }
            tasks += [task_dict]

df = pd.DataFrame(tasks)
df = df[df.t_accepted!=np.inf]
df['delta'] = df.t_completed - df.t_accepted
df

Unnamed: 0,worker_id,t_accepted,t_completed,job,delta
0,6,332.781647,353.609406,0,20.827759
1,7,332.781647,353.609406,0,20.827759
4,0,332.781647,349.718292,0,16.936646
8,0,313.588165,332.781647,0,19.193481
12,7,277.441711,300.494019,0,23.052307
...,...,...,...,...,...
308,2,82.310844,103.742615,9,21.431770
309,3,82.310844,103.742615,9,21.431770
312,0,74.722397,93.971260,9,19.248863
313,7,76.157715,95.406578,9,19.248863


In [105]:
fig = px.timeline(df, x_start="t_accepted", x_end="t_completed", y="worker_id", color='job')
for job in prev_obs.jobs:
    fig.add_vline(x=job.t_completed[0], line_width=2, line_color='green')
fig.layout.xaxis.type = 'linear'
for d in fig.data:
    d.x = df[df.job==d.name].delta.tolist()
    
fig.show()