# Profiling Configurations of Parsl

The purpose of this notebook is to explore the performance issues that are happening when scaling Parsl for Desc. It seems that parsl is limited to launching between 40 and 80 tasks per second. This causes issues since Parsl is unable to saturate more than a few nodes. In this notebook, we will first time different configurations of parsl to determine under which configurations we see the performance issues. The next section of the notebook will be dedicated to profiling the launching thread to determine if we are able to find bottlenecks where Parsl is taking longer than expected. At first the plan is to run this notebook locally. Perhaps if the issues are within some Parsl method, we will be able to find the results this way. Otherwise, we will run the same tests on NERSC under the assumption that some interatction between Parsl and the nersc file system or scheduler.

## 0. Setup

In [12]:
import cProfile
import itertools
import multiprocessing
import os
import pstats
from pstats import SortKey
import time

import pandas as pd

import parsl

In [2]:
@parsl.python_app
def python_noop(name):
    return f"""Finished job {name}"""

@parsl.bash_app
def bash_noop(name):
    return f"echo Finished job {name}"

@parsl.python_app
def python_sleep(name, trun):
    import time
    print(f"""Running job {name} for {trun:.1f} seconds""")
    time.sleep(trun)
    return f"""Finished job {name}"""

@parsl.bash_app
def bash_sleep(name, trun):
    scom = f"time sleep {trun}; echo Finished job {name}"
    return scom

In [3]:
monitoring_hub = parsl.monitoring.monitoring.MonitoringHub(
    hub_address=parsl.addresses.address_by_hostname(),
    hub_port=55055,
    monitoring_debug=False,
)

In [15]:
def setup(sexec="thread", workers=1, nnodes=1, with_monitoring=False):
    if sexec == "thread":
        executor = parsl.ThreadPoolExecutor(
            max_threads=workers
        )
    elif sexec == "htex":
        executor = parsl.HighThroughputExecutor(
            label="local_htex",
            cores_per_worker=1,
            max_workers=workers,
            address=parsl.addresses.address_by_hostname(),
            provider=parsl.providers.LocalProvider(
                worker_init="source activate parsl-monitoring"
            )
        )
    elif sexec == "work_queue":
        executor = parsl.WorkQueueExecutor(
            worker_options=f"--cores={workers}",
            provider=parsl.providers.LocalProvider(
                worker_init="source activate parsl-monitoring"
            )
        )
    else:
        raise Exception(f"Invalid executor specifier: {sexec}")

    monitoring=None
    if with_monitoring == True:
        monitoring = monitoring_hub

    config = parsl.config.Config(
        executors = [executor],
        monitoring=parsl.monitoring.monitoring.MonitoringHub(
            hub_address=parsl.addresses.address_by_hostname(),
            hub_port=55055,
            monitoring_debug=False,
        ),
        strategy=None,
    )

    try:
        dfk = parsl.dfk()
    except Exception as e:
        print(e)
        dfk = None

    if dfk is not None:
        if dfk.monitoring is not None:
            dfk.monitoring.close()
        parsl.clear()

    parsl.load(config)
    return

def cleanup():
    for child in multiprocessing.active_children():
        print('Terminating', child)
        child.terminate()

In [5]:
def run_test(test_name, jobtype="python", trun=0, njob=1):
    print(f"Running test: {test_name}")

    if trun == 0:
        task = lambda name, trun: eval(f"{jobtype}_noop(name)")
    else:
        task = lambda name, trun: eval(f"{jobtype}_sleep(name, trun)")

    jobs = []
    for ijob in range(njob):
        jobs.append(task(f"job_{ijob}", trun))

    results = []
    for ijob, job in enumerate(jobs):
        try:
            r = job.result()
        except Exception as e:
            print(f"Job {ijob} raised an exception: ", e)
            r = None
        results.append(r)

## 1. Timing

Construct the scaling information for many different configurations of executing Parsl

In [6]:
types = ["python", "bash"]
executors = ["thread", "htex", "work_queue"]
times = [0]
ntasks = [1, 100, 200, 300, 400, 500]
workers = [1, 32]
num_nodes = [1]
monitoring = [False, True]

In [6]:
types = ["python"]
executors = ["htex", "work_queue"]
times = [0]
ntasks = [100]
workers = [32]
num_nodes = [1]
monitoring = [True, False]

In [7]:
test_results = []
for (task_type, e, t, n, w, nodes, m) in itertools.product(types, executors, times, ntasks, workers, num_nodes, monitoring):
    stmt = f""""python test_parsl.py -y {task_type} -n {m} -t {t} -e {e} -w {w} --nodes {nodes}"""
    if monitoring:
        stmt += " -m"
    
    t0 = time.time()
    os.system(stmt)
    t1 = time.time()
    elapsed_time = t1 - t0
    
    test_results.append((task_type, e, t, n, w, nodes, m, elapsed_time))
    
    
df = pd.DataFrame.from_records(test_results, columns=["type", "executor", "time_per_task", "ntasks", "workers", "nodes", "monitoring", "total time"])
df.head()

Must first load config
run_test("python_n=100_t=0_e=htex_w=32_nodes=1_m=True", "python", 0, 100)
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
run_test("python_n=100_t=0_e=htex_w=32_nodes=1_m=False", "python", 0, 100)
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=False
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=False
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=False
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=False
Running test

Unnamed: 0,type,executor,time_per_task,ntasks,workers,nodes,monitoring,total time
0,python,htex,0,100,32,1,True,2.934003
1,python,htex,0,100,32,1,False,3.709631
2,python,work_queue,0,100,32,1,True,186.618081
3,python,work_queue,0,100,32,1,False,176.128398


Exception in thread WorkQueue-collector-thread:
Traceback (most recent call last):
  File "/home/alokvk2/.conda/envs/parsl-monitoring/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/alokvk2/.conda/envs/parsl-monitoring/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/alokvk2/src/parsl/parsl/process_loggers.py", line 27, in wrapped
    r = func(*args, **kwargs)
  File "/home/alokvk2/src/parsl/parsl/executors/workqueue/executor.py", line 699, in _collect_work_queue_results
    raise ExecutorError(self, "Workqueue Submit Process is not alive")
parsl.executors.errors.ExecutorError: Executor WorkQueueExecutor failed due to: Workqueue Submit Process is not alive


## 2. Profiling

In this section we will start trying to profile the parsl code to figure out which bits contribute the most to the execution time. This is not going to be perfect because parsl is a multi-process application. However, it may be sufficient to profile the launching thread because it seems that in some cases the launching of the process becomes the bottleneck.

In [17]:
profiling_stats = []
pr = cProfile.Profile()
for test_no, (task_type, e, t, n, w, nodes, m) in enumerate(itertools.product(types, executors, times, ntasks, workers, num_nodes, monitoring)):
    test_name = f"{task_type}_n={n}_t={t}_e={e}_w={w}_nodes={nodes}_m={m}"
    stmt = f""""test_parsl.py -y {task_type} -n {m} -t {t} -e {e} -w {w} --nodes {nodes}"""
    if monitoring:
        stmt += " -m"
    
    t0 = time.time()
    os.system(f"python -m cProfile -o {test_name}.prof {stmt}")
    t1 = time.time()
    elapsed_time = t1 - t0
    
    print(f"Finished test {test_no} in {elapsed_time} seconds")

---------------------------------------------------------------------------------------
Profiling statistics for test: 0. python_n=100_t=0_e=htex_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=True
 
*** Profile stats marshalled to file '/tmp/user/23030/tmpevkavpmz'.
Embedding SnakeViz in this document...


---------------------------------------------------------------------------------------
Terminating <ForkProcess name='Monitoring-DBM-Process' pid=2749719 parent=2745012 started daemon>
Terminating <ForkProcess name='Monitoring-Router-Process' pid=2749715 parent=2745012 started daemon>
Terminating <ForkProcess name='HTEX-Interchange' pid=2749738 parent=2745012 started daemon>
Terminating <Process name='Monitoring-Filesystem-Process' pid=2749720 parent=2745012 started daemon>
---------------------------------------------------------------------------------------
Profiling statistics for test: 1. python_n=100_t=0_e=htex_w=32_nodes=1_m=False
Running test: python_n=100_t=0_e=htex_w=32_nodes=1_m=False
 
*** Profile stats marshalled to file '/tmp/user/23030/tmprubtgyee'.
Embedding SnakeViz in this document...


---------------------------------------------------------------------------------------
Terminating <Process name='Monitoring-Filesystem-Process' pid=2750320 parent=2745012 started daemon>
Terminating <ForkProcess name='Monitoring-Router-Process' pid=2750315 parent=2745012 started daemon>
Terminating <ForkProcess name='Monitoring-DBM-Process' pid=2750316 parent=2745012 started daemon>
Terminating <ForkProcess name='HTEX-Interchange' pid=2750338 parent=2745012 started daemon>
---------------------------------------------------------------------------------------
Profiling statistics for test: 2. python_n=100_t=0_e=work_queue_w=32_nodes=1_m=True
Running test: python_n=100_t=0_e=work_queue_w=32_nodes=1_m=True
 
*** Profile stats marshalled to file '/tmp/user/23030/tmp47gch80z'.
Embedding SnakeViz in this document...


---------------------------------------------------------------------------------------
Terminating <ForkProcess name='Monitoring-Router-Process' pid=2750913 parent=2745012 started daemon>
Terminating <ForkProcess name='Monitoring-DBM-Process' pid=2750914 parent=2745012 started daemon>
Terminating <Process name='WorkQueue-Submit-Process' pid=2750929 parent=2745012 started>
Terminating <Process name='Monitoring-Filesystem-Process' pid=2750918 parent=2745012 started daemon>


Exception in thread WorkQueue-collector-thread:
Traceback (most recent call last):
  File "/home/alokvk2/.conda/envs/parsl-monitoring/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/alokvk2/.conda/envs/parsl-monitoring/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/alokvk2/src/parsl/parsl/process_loggers.py", line 27, in wrapped
    r = func(*args, **kwargs)
  File "/home/alokvk2/src/parsl/parsl/executors/workqueue/executor.py", line 699, in _collect_work_queue_results
    raise ExecutorError(self, "Workqueue Submit Process is not alive")
parsl.executors.errors.ExecutorError: Executor WorkQueueExecutor failed due to: Workqueue Submit Process is not alive


---------------------------------------------------------------------------------------
Profiling statistics for test: 3. python_n=100_t=0_e=work_queue_w=32_nodes=1_m=False
Running test: python_n=100_t=0_e=work_queue_w=32_nodes=1_m=False
 
*** Profile stats marshalled to file '/tmp/user/23030/tmp7ytcpzuy'.
Embedding SnakeViz in this document...


---------------------------------------------------------------------------------------
Terminating <Process name='Monitoring-Filesystem-Process' pid=2751517 parent=2745012 started daemon>
Terminating <ForkProcess name='Monitoring-DBM-Process' pid=2751513 parent=2745012 started daemon>
Terminating <ForkProcess name='Monitoring-Router-Process' pid=2751512 parent=2745012 started daemon>
Terminating <Process name='WorkQueue-Submit-Process' pid=2751524 parent=2745012 started>


Exception in thread WorkQueue-collector-thread:
Traceback (most recent call last):
  File "/home/alokvk2/.conda/envs/parsl-monitoring/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/alokvk2/.conda/envs/parsl-monitoring/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/alokvk2/src/parsl/parsl/process_loggers.py", line 27, in wrapped
    r = func(*args, **kwargs)
  File "/home/alokvk2/src/parsl/parsl/executors/workqueue/executor.py", line 699, in _collect_work_queue_results
    raise ExecutorError(self, "Workqueue Submit Process is not alive")
parsl.executors.errors.ExecutorError: Executor WorkQueueExecutor failed due to: Workqueue Submit Process is not alive
