# [ParslFest 2025](https://parsl-project.org/parslfest/parslfest2025.html)

# [Accelerating QMCpy Notebook Tests with Parsl](https://www.figma.com/slides/k7EUosssNluMihkYTLuh1F/Parsl-Testbook-Speedup?node-id=1-37&t=WnKcu2QYO8JXvtpP-0)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/QMCSoftware/QMCSoftware/blob/develop/demos/talk_paper_demos/parsl_fest_2025/parsl_fest_2025.ipynb)

Joshua Herman, Brandon Sharp, and Sou-Cheng Choi, QMCPy Developers

Aug 28 -- 29, 2025

Updated: Dec 1, 2025


**Requirements**:

* testbook : `pip install testbook==0.4.2`
* Parsl: `pip install parsl==2025.7.28`

In [None]:
try:
    import parsl as pl
except ModuleNotFoundError:
    !pip install -q parsl

In [None]:
import sys
import os
import time
import parsl as pl

# Ensure the path to the booktests directory is included (robust finder)
def _find_repo_root(start=os.getcwd()):
    cur = start
    while True:
        if os.path.exists(os.path.join(cur, 'pyproject.toml')):
            return cur
        parent = os.path.dirname(cur)
        if parent == cur:
            raise FileNotFoundError('repo root not found')
        cur = parent

sys.path.append(os.path.join(_find_repo_root(), 'test', 'booktests'))

# Configuration flags
force_compute = True
is_debug = True

# Create output directory if it doesn't exist
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)

## 2. Parsl

1. Install and Configure Parsl
2. Run the tests in parallel with Parsl

### 2.1 Configure Parsl

In [None]:
import sys
from parsl.config import Config
from parsl.executors import HighThroughputExecutor, ThreadPoolExecutor
from parsl.providers import LocalProvider
import os
import platform

# Read max workers from environment if provided, else default to cpu_count()-1
try:
    max_workers = int(os.environ.get('PARSL_MAX_WORKERS'))
except Exception:
    max_workers = max(1, (os.cpu_count() or 2) - 1)

# Choose executor based on platform to avoid macOS spawn/interchange issues
if platform.system().lower() == "darwin":
    # macOS: use threads to avoid subprocess/interchange problems
    exec_obj = ThreadPoolExecutor(max_threads=max_workers, label="local_threads")
else:
    # Other platforms: use HTEX with explicit interchange command
    interchange_cmd = [sys.executable, "-m", "parsl.executors.high_throughput.interchange"]
    exec_obj = HighThroughputExecutor(
        label="htex_local",
        max_workers_per_node=max_workers,
        provider=LocalProvider(init_blocks=1, max_blocks=1),
        interchange_launch_cmd=interchange_cmd,
    )

config = Config(executors=[exec_obj])

# Ensure clean state: clear any existing Parsl config from previous runs
pl.clear()

# Now load the config
pl.load(config)

# print worker info
if hasattr(exec_obj, "max_workers_per_node"):
    workers_info = exec_obj.max_workers_per_node
elif hasattr(exec_obj, "max_threads"):
    workers_info = exec_obj.max_threads
else:
    workers_info = max_workers

print(f"Parsl loaded with {workers_info} workers")

### 2.2 Create a Parsl Test Runner

In [None]:
import parsl_test_runner
import inspect

# See only functions
print("Functions:")
functions = inspect.getmembers(parsl_test_runner, inspect.isfunction)
for name, func in functions:
    print(f"- {name}")
print("\n" + "="*50)

# Get help on specific function
print("Help for execute_parallel_tests:")
help(parsl_test_runner.execute_parallel_tests)

In [None]:
# Verify Parsl configuration
print(f"Max workers configured: {max_workers}")
print(f"Active Parsl DFK: {pl.dfk()}")
print(f"Executors: {[executor.label for executor in pl.dfk().executors.values()]}")
if hasattr(config, 'executors'):
    for executor in config.executors:
        if hasattr(executor, 'max_workers_per_node'):
            print(f"Executor '{executor.label}' max_workers_per_node: {executor.max_workers_per_node}")

### 2.3 Run the Notebooks in Parallel with Parsl

In [None]:
import uuid
import subprocess
import re

execution_id = str(uuid.uuid4())[:8]
print(f"=== EXECUTION ID: {execution_id} ===")
print(f"Starting parallel test execution with {max_workers} workers...")

par_fname = os.path.join(output_dir, f"parallel_times_{max_workers}.csv")
par_output = os.path.join(output_dir, f"parallel_output_{max_workers}.txt")
is_linux = sys.platform.startswith("linux")

if (not os.path.exists(par_fname)) or force_compute:
    repo_root = _find_repo_root()
    
    if is_debug:
        tests = "tb_quickstart tb_qmcpy_intro tb_lattice_random_generator"
        cmd = ["make", "booktests_parallel_no_docker", f"TESTS={tests}"]
    else:
        cmd = ["make" if not is_linux else "make -j1", "booktests_parallel_no_docker"]
    if is_linux:
        cmd = ["taskset", "-c", "0"] + cmd
    
    # Propagate PARSL_MAX_WORKERS environment variable to subprocess
    env = os.environ.copy()
    env['PARSL_MAX_WORKERS'] = str(max_workers)
    
    with open(par_output, 'wb') as out_f:
        try:
            subprocess.run(cmd, cwd=repo_root, stdout=out_f, stderr=subprocess.STDOUT, check=True, env=env)
        except subprocess.CalledProcessError:
            pass
    
    # parse parallel time from output (sum of individual test times, not wall-clock)
    with open(par_output, 'r', encoding='utf-8', errors='ignore') as f:
        text = f.read()
        match = re.search(r"Total test time: ([\d\.]+)s", text)
        if match:
            parallel_time = float(match.group(1))
        else:
            parallel_time = 0.0

    print(f"\n=== RESULTS FOR EXECUTION {execution_id} ===")
    print(f"Parallel time: {parallel_time:.2f} seconds")

    with open(par_fname, "w") as f:
        _ = f.write(f"workers,time\n")
        _ = f.write(f"{max_workers},{parallel_time:.2f}\n")
    
    print(f"=== END EXECUTION {execution_id} ===")

In [None]:
!date
!ls -ltr output

In [None]:
import platform

if platform.system().lower() == 'linux':
    !uname -a
    !nproc --all
    !awk '/MemTotal/ {printf "%.2f GB\n", $2/1024/1024}' /proc/meminfo