In [None]:
%load_ext autoreload
import ddls

Initialise what type(s) of nodes (servers) you want in your cluster and what type(s) of worker(s) you want to populate them with.

In [None]:
%autoreload
from ddls.devices.processors.gpus.A100 import A100

node_config = {'type_1':
                  {
                      'num_nodes': 16,
                      'workers_config': 
                          [
                              {
                               'num_workers': 4,
                               'worker': A100
                              }
                          ]
                  }
              }

print(node_config)

Initialise the topology to be populated by your nodes.

In [None]:
topology_config = {'type':
                      'torus',
                   'kwargs':
                      {
                          'x_dims': 4,
                          'y_dims': 4
                      }
                  }

Initialise the cluster environment using your node and topology configuration.

In [None]:
%autoreload
from ddls.environments.cluster.cluster_environment import ClusterEnvironment

env = ClusterEnvironment(topology_config=topology_config,
                          node_config=node_config,
                          path_to_save='/scratch/datasets/ddls/sims',
                          save_freq=100,
                          use_sqlite_database=True)
print(env)
env.topology.render()

for node in env.topology.graph:
    print(f'Node {node}: {env.topology.graph.nodes[node]}')

Load the `ddls` computation graph(s) you want to run on the cluster.

In [None]:
%autoreload
from ddls.utils import ddls_graph_from_pbtxt_file
from ddls.plotting.plotting import plot_computation_graph

import glob


# get file paths
path_to_files = '/scratch/datasets/ddls/jobs/tensorflow_synthetic_graphs/valid'
file_paths = glob.glob(path_to_files + '/*')
    
# create ddls graph
num_graphs = 100
ddls_computation_graphs = [ddls_graph_from_pbtxt_file(file_path, processor_type_profiled='A100', verbose=False) for file_path in file_paths[:num_graphs]]

# visualise
visualise = False
if visualise:
    for graph in ddls_computation_graphs:
        fig = plot_computation_graph(graph, scaling_factor=3, title='ddls_graph', show_fig=True, verbose=True)

Initialise `ddls` job(s) from the computation graph(s)

In [None]:
%autoreload
from ddls.demands.jobs.job import Job

jobs = [Job(computation_graph=graph, num_training_steps=2) for graph in ddls_computation_graphs]

for job in jobs:
    print(job)

Initialise an operation placement agent.

In [None]:
%autoreload
from ddls.managers.placers.random_job_placer import RandomJobPlacer
from ddls.managers.schedulers.srpt_job_scheduler import SRPTJobScheduler

control_plane = {
    'job_placer': RandomJobPlacer(),
    'job_scheduler': SRPTJobScheduler()
}

Reset cluster environment and run `Cluster`

In [None]:
%autoreload
from ddls.distributions.uniform import Uniform
from ddls.utils import seed_stochastic_modules_globally

import time
import pprint


# seeds = [0, 1, 2]
seeds = [0]
for seed in seeds:
    print(f'\n\n\n~~~~~~~~~~~~~~~~~~~~~~~ Seed {seed} ~~~~~~~~~~~~~~~~~~~~~~~')
    seed_stochastic_modules_globally(seed)
    obs, action_set, reward, done, info = env.reset(jobs=jobs,
                                                    job_sampling_mode='remove',
                                                    job_interarrival_time_dist=Uniform(min_val=1, max_val=1000),
                                                    max_simulation_run_time=float('inf'),
                                                    job_queue_capacity=10,
                                                    seed=seed,
                                                    verbose=True)
    
    start_time = time.time()
    while not done:
        # make decisions
        actions = {}
        actions['job_placement'] = control_plane['job_placer'].get_placement(cluster=env)
        actions['job_schedule'] = control_plane['job_scheduler'].get_schedule(new_placements=actions['job_placement'], cluster=env)

        # pass actions to cluster environment and step the cluster
        obs, action_set, reward, done, info = env.step(actions, verbose=False)

        print(f'Step {env.step_counter} | Jobs arrived: {env.num_jobs_arrived} | completed: {len(env.jobs_completed)} | blocked: {len(env.jobs_blocked)} | running: {len(env.jobs_running)} | queued: {len(env.job_queue)}')

    print(f'\nCompleted simulation in {time.time() - start_time:.3f} s')

**TODO**: Implement logic for placing job ops of each job -> step env -> time job completion for 1 training step (can then think about $n$ training steps, network communication overhead, etc.)

- Have mounted job onto devices
- Now need to work out way of tracking ops running on each device efficiently and timing how long they take. Consider having global dict tracking operations which are running to avoid having to keep looping through all ops to check dependencies. Consider also having stopwatch object similar to Noah's where only tick it when have stacked enough sequential operations. N.B. Think should assume that, once ops have been placed on a device, they must be ran sequentially (i.e. cannot run multiple ops on one device at the same time; assume time profile is for e.g. GPU worker running just that one op with <= all its cores)

Lets load our SQLite logs and plot some data.

In [None]:
%autoreload
from collections import defaultdict
from sqlitedict import SqliteDict
import pprint
import glob
import time


base_folder = '/scratch/datasets/ddls/sims/'
base_name = 'cluster'
ids = [20]


steps_logs_dict = defaultdict(lambda : defaultdict(list))
sim_logs_dict = defaultdict(lambda : defaultdict(list))
start_time = time.time()
for i in ids:
    agent = base_name + f'_{i}'
    paths = [reset_folder for reset_folder in glob.glob(base_folder + f'/{base_name}/{agent}/*/')]
    
    for path in paths:
        with SqliteDict(path + '/steps_log.sqlite') as log:
            for key, val in log.items():
                steps_logs_dict[agent][key].extend(val)
            log.close()

        with SqliteDict(path + '/sim_log.sqlite') as log:
            for key, val in log.items():
                sim_logs_dict[agent][key].extend(val)
            log.close()
        
print(f'\nsteps_logs_dict: {steps_logs_dict}\n')
print(f'\nsim_logs_dict: {sim_logs_dict}\n')
print(f'\nAll data loaded in {time.time() - start_time:.3f} s.')

## Step-level metrics

E.g. How many jobs, if any, were completed at each step?

In [None]:
import copy

def augment_steps_logs_dict(steps_logs_dict):
    '''Calculates additional metrics for steps logs dict.'''
    _steps_logs_dict = copy.deepcopy(steps_logs_dict)
    for agent in steps_logs_dict.keys():
        _steps_logs_dict[agent]['step_time'] = [steps_logs_dict[agent]['step_end_time'][i] - steps_logs_dict[agent]['step_start_time'][i] for i in range(len(steps_logs_dict[agent]['step_start_time']))]
    return _steps_logs_dict
steps_logs_dict = augment_steps_logs_dict(steps_logs_dict)

In [None]:
%autoreload
from ddls.plotting.plotting import plot_line

import pandas as pd
from collections import defaultdict


# plot config
x = 'step_counter'
scaling_factor = 1
metrics_to_skip = {'step_counter'}

# make plots
metrics = steps_logs_dict[agent].keys()
metrics_to_plot = [metric for metric in metrics if metric not in metrics_to_skip]
print(f'Metrics to plot: {metrics_to_plot}\nMetrics to skip: {metrics_to_skip}\n')
for metric in metrics_to_plot:
    print(f'Plotting metric {metric}')
    plot_dict = {}
    for _agent in steps_logs_dict.keys():
        plot_dict['Agent'] = [_agent for _ in range(len(steps_logs_dict[_agent][metric]))]
        plot_dict[x] = steps_logs_dict[_agent][x]
        plot_dict[metric] = steps_logs_dict[agent][metric]
    fig = plot_line(pd.DataFrame(plot_dict), 
                    x=x, 
                    y=metric, 
                    hue='Agent', 
                    xlabel=x, 
                    ylabel=metric, 
                    err_style='band', # 'band' 'bars'
                    ci=68, # 95 68
                    scaling_factor=scaling_factor,
                    show_fig=True)

## Sim-level metrics

E.g. Mean job completion time?

In [None]:
# import copy
# import numpy as np

# def augment_sim_logs_dict(sim_logs_dict):
#     '''Calculates additional metrics for steps logs dict.'''
#     _sim_logs_dict = copy.deepcopy(sim_logs_dict)
#     for agent in sim_logs_dict.keys():
#         for metric in ['job_completion_time']:
#             _sim_logs_dict[agent][f'mean_{metric}'] = np.mean(sim_logs_dict[agent][metric])
#             _sim_logs_dict[agent][f'p99_{metric}'] = np.percentile(sim_logs_dict[agent][metric], 99)
#             _sim_logs_dict[agent][f'median_{metric}'] = np.median(sim_logs_dict[agent][metric])
#             _sim_logs_dict[agent][f'std_{metric}'] = np.std(sim_logs_dict[agent][metric])
#     return _sim_logs_dict
# sim_logs_dict = augment_sim_logs_dict(sim_logs_dict)

In [None]:
%autoreload
from ddls.plotting.plotting import plot_bar, plot_hist

import pandas as pd
from collections import defaultdict
import numpy as np
import scipy.stats as st


# plot config
scaling_factor = 1
metrics_to_skip = {}
estimators = {'mean': np.mean,
              'median': np.mean,
              'iqr': st.iqr,
              'gmean': st.gmean}


# make plots
metrics = sim_logs_dict[agent].keys()
metrics_to_plot = [metric for metric in metrics if metric not in metrics_to_skip]
print(f'Metrics to plot: {metrics_to_plot}\nMetrics to skip: {metrics_to_skip}\n')
for metric in metrics_to_plot:
    print(f'Plotting metric {metric}')
    plot_dict = {}
    for _agent in sim_logs_dict.keys():
        plot_dict['Agent'] = [_agent for _ in range(len(sim_logs_dict[_agent][metric]))]
        plot_dict[metric] = sim_logs_dict[agent][metric]
        
    df = pd.DataFrame(plot_dict)
        
    # hist
    fig = plot_hist(df,
                    x=metric,
                    hue='Agent',
                    xlabel=metric,
                    element='bars',
                    fill=True,
                    cumulative=False,
                    stat='count',
                    multiple='layer',
                    scaling_factor=scaling_factor,
                    show_fig=True)
    
    # cdf
    fig = plot_hist(df,
                    x=metric,
                    hue='Agent',
                    xlabel=metric,
                    element='step',
                    fill=False,
                    cumulative=True,
                    stat='density',
                    common_norm=False,
                    scaling_factor=scaling_factor,
                    show_fig=True)
        
    # bar chart
    for estimator_name, estimator in estimators.items():
        fig = plot_bar(df, 
                        x='Agent', 
                        y=metric, 
                        xlabel='Agent', 
                        ylabel=metric, 
                        estimator=estimator,
                        title=estimator_name,
                        scaling_factor=scaling_factor,
                        show_fig=True)