# ScheduleFlow 

Simple example for running a simulation of online or batch schedulers

In [1]:
import sys
sys.path.append("..")
import ScheduleFlow

import numpy as np

In [2]:
num_processing_units = 10

## 1. Creating the workload 

The workload consist of *num_jobs* jobs:
 - Running for x seconds (with x randomly chosen between 10 and 100)
 - Two of them requesting the exact walltime
 - The rest requesting more time than needed (in increments of 10 seconds)
 - Executing on random number of processing units (between 1 and *num_processig_units*)
 
The workload has an extra job runnning for 100 seconds on the entire machine and requesting 90 seconds. In case of failure the second submission will request 135 seconds.

In [3]:
def create_job_list(num_processing_units, num_jobs):
    job_list = set()

    for i in range(num_jobs):
        execution_time = np.random.randint(10, 101)
        request_time = execution_time + int(i / 2) * 10
        processing_units = np.random.randint(1, num_processing_units + 1)
        submission_time = 0
        job_list.add(ScheduleFlow.Application(
            processing_units,
            submission_time,
            execution_time,
            [request_time]))
    job_list.add(ScheduleFlow.Application(num_processing_units, 0,
                                          100, [90, 135]))
    return job_list

job_list = create_job_list(num_processing_units, 3)
print(list(job_list)[0])
job_list

Job -1: 3 nodes; 0.0 submission time; 12.0 total execution time (12.0 requested)


{Job(Nodes: 3, Submission: 0.0, Walltime: 12.0, Request: 12.0),
 Job(Nodes: 7, Submission: 0.0, Walltime: 43.0, Request: 43.0),
 Job(Nodes: 9, Submission: 0.0, Walltime: 96.0, Request: 106.0),
 Job(Nodes: 10, Submission: 0.0, Walltime: 100.0, Request: 90.0)}

## 2. Creating the simulation scenario

- Workload of 10 sucessfull jobs and one failed
- Checks for correctness at the end
- Outputs the results on stdout
- Uses 10 loops and returns the average metrics

The simulation uses a **batch scheduler** 

In [4]:
# create the simulator
job_list = create_job_list(num_processing_units, 10)
simulator = ScheduleFlow.Simulator(check_correctness=True,
                                   output_file_handler=sys.stdout,
                                   loops = 10)
simulator

Simulator(GIF: False, Check_correctness: True, Loops: 10, Output: <ipykernel.iostream.OutStream object at 0x7f957091f5f8>, Jobs: 0)

In [5]:
sch = ScheduleFlow.Scheduler(ScheduleFlow.System(num_processing_units))
print(sch)

Batch Scheduler: System: 10 total nodes (10 currently free); Wait queue: total of 1; 0 jobs running


In [6]:
simulator.create_scenario(sch, job_list=job_list)
simulator

Simulator(GIF: False, Check_correctness: True, Loops: 10, Output: <ipykernel.iostream.OutStream object at 0x7f957091f5f8>, Jobs: 11)

## 3. Get the results

The simulator run function *outputs the results* of each loop on the handler provided (stdout in this case) and *returns a structure with the average performance* for each metric.

In [7]:
results = simulator.run(metrics=["system"])

Scenario name : system makespan : system utilization : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 
ScheduleFlow : 791.00 : 0.54 : 


In [8]:
results

{'system makespan': 791.0, 'system utilization': 0.5391908975979771}

# Example to test correctness

- The loops withing the same workload is set to 1
- We run the simulation 10 times over different workloads and return the average utilization

In [9]:
simulator = ScheduleFlow.Simulator(check_correctness=True)
simulator

Simulator(GIF: False, Check_correctness: True, Loops: 1, Output: None, Jobs: 0)

In [10]:
loops = 10
avg_results = {}
for i in range(loops):
    job_list = create_job_list(num_processing_units, 10)
    results = simulator.run_scenario(
        sch,
        job_list,
        metrics=["utilization"])
    if len(avg_results)==0:
        avg_results = results
    else:
        avg_results = {i:avg_results[i]+results[i] for i in avg_results}
avg_results = {i:avg_results[i]/loops for i in avg_results}
print(avg_results)

{'job utilization': 0.7231119245082526, 'system utilization': 0.5986555376063898}


In [11]:
# get all statistics for the last simulation run
simulator.get_stats_metrics(["all"])

{'job failures': 1,
 'job response time': 473.0,
 'job stretch': 13.151740104620387,
 'job utilization': 0.6760499179855756,
 'job wait time': 376.3333333333333,
 'system makespan': 716,
 'system utilization': 0.590782122905028}

In [12]:
execution_log = simulator.get_execution_log()
simulator.test_correctness(execution_log=execution_log)

0

## Inject failures in the execution log

### 1. Within one jobs execution

For the failed job, make the second execution start before the ending of the first

In [13]:
# inject incorrect execution sequences into the log
for job in execution_log:
    # insert an out of order execution for the failed job
    if len(execution_log[job])>1:
        # move the execution of the failed job to the end of the log
        execution_log[job] = [[i[0]+1000, i[1]+1000] for i in execution_log[job]]
        # overlap the beginning of the second instance with the first one
        execution_log[job][1][0] = execution_log[job][0][1] - 1
        execution_log[job][1][1] = execution_log[job][1][0] + job.walltime
        break

simulator.test_correctness(execution_log=execution_log)

Job 6: 10 nodes; 0.0 submission time; 100.0 total execution time (90.0 requested) did not pass the sanity check: [[1250, 1340], [1339, 1439]]


1

### 2. Between all jobs executions

Move the first execution of the failed job (*which uses the entire machine*) over a moment when other jobs are running

In [14]:
# create moments when more than total nodes are allocated
start = 0
for job in execution_log:
    # move the first instance of the failed job to the beginning on the simulation
    if len(execution_log[job])>1:
        if execution_log[job][0][0] == 1000:
            start = 100
        end = start + job.request_walltime
        execution_log[job][0][0] = start
        execution_log[job][0][1] = end
simulator.test_correctness(execution_log=execution_log)

Full schedule did not pass sanity check


1

In [15]:
# plot all the timeslots that fail the full schedule sanity check
for job in execution_log:
    if execution_log[job][0][0] <= end and (execution_log[job][0][0]+job.request_walltime) >= start:
        print(job)
        print(execution_log[job])

Job 2: 9 nodes; 0.0 submission time; 95.0 total execution time (125.0 requested)
[[0, 95]]
Job 6: 10 nodes; 0.0 submission time; 100.0 total execution time (90.0 requested)
[[0, 90], [1339, 1439]]
