In [1]:
# Standard Library Imports
import os
import time
import subprocess

# Third Party Imports
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
from dask import array as da
import numpy as np
from scipy import ndimage

# Local Imports


## Specify File System
- Run `umask` to check our default file-creation permissions.
- Create a fully writable `local_directory` for Zarr’s intermediate chunks (required by `map_overlap`).
- Define `data_path` for the input Zarr dataset and `save_path` for the output Zarr.

**What is `umask`?**

The “user file-creation mode mask” that determines the default permissions for **new** files and directories.

**How it works**

  1. The system starts with a _full_ default permission (octal):
     - **Files:** `666` (read/write for owner/group/others)
     - **Dirs:**  `777` (read/write/execute for owner/group/others)
  2. It then **subtracts** (bitwise) the umask value.
     - Example: umask `022` → new files get `666 − 022 = 644` (`rw-r‐-r‐-`)
                 new dirs get `777 − 022 = 755` (`rwx-r-xr-x`)

- **Viewing your mask**
  ```bash
  $ umask
  0022

In [2]:
# Figure out what your default `umask` setting is.
result = subprocess.run("umask", shell=True, capture_output=True, text=True)
print("Subprocess umask:", result.stdout.strip())

# Create temporary directory and make sure that we have write privileges.
local_directory="/project/bioinformatics/Danuser_lab/Dean/dean/dask_temp"
subprocess.run(f"mkdir -p {local_directory} && chmod -R 777 {local_directory}", shell=True)

# Location of the data.
base_path = "/archive/bioinformatics/Danuser_lab/Dean/dean/2024-05-21-tiling"
data_path = os.path.join(base_path, "cell5_fused_tp_0_ch_0.zarr")
save_path = os.path.join(base_path, 'example_4.zarr')


Subprocess umask: 0022


## Specify your cluster's operating parameters.

In [3]:
cluster_kwargs = {
    'cores': 28, # Number of threads per worker (utilizing cores within each process)
    'processes': 1, # Number of Python processes/worker.
    'memory': '220GB', # Total memory to allocate for each worker job.
    'local_directory': local_directory, #  Path for the Dask worker’s local storage (scratch space).
    'interface': 'ib0', # Network interface identifier for Dask communications. Infiniband.
    'walltime': "01:00:00", # The wall-time limit for each job, in HH:MM:SS.
    'job_name': "nanocourse", # Name for the Slurm job, publicly visible via squeue command.
    'queue': "256GB", # Slurm partition/queue to submit the jobs to
    'death_timeout': "600s", #  Timeout (in seconds) for worker survival without a scheduler connection.
    'job_extra_directives': [
        # --nodes=1 and --ntasks=1 ensure each job runs on a single node with one task
        "--nodes=1",
        "--ntasks=1",
        "--mail-type=FAIL",
        "--mail-user=kevin.dean@utsouthwestern.edu",
        "-o job_%j.out",
        "-e job_%j.err",
    ],
    'scheduler_options': {
        # A dictionary of settings passed to the Dask scheduler.
        "dashboard_address": ":9000",       # Dashboard web interface port
        "interface": "ib0",

        # Resource management
        "idle_timeout": "3600s",            # How long workers stay alive when idle (1 hour)
        "allowed_failures": 10,             # More failures allowed before worker marked as bad
    },
}


## Important `cluster_kwargs`

-  cores: 8 – Total number of CPU cores allocated per Dask worker job. With processes=1, this means the single worker process will use 8 threads (8 cores) for parallel computations ￼. This value is also used by Slurm to request 8 CPUs for the job (effectively --cpus-per-task=8 when combined with one task).
-  processes: 1 – Number of separate Python worker processes to start per job. Here 1 process will utilize all the threads/cores in the job. Using a single process is common if your tasks release the GIL or benefit from multi-threading; if tasks were pure Python (GIL-bound) or the node had many cores, you might increase this to spawn multiple smaller processes ￼ (each with cores/processes threads).


## Functions for Launching the Operation

In [7]:
def wait_for_cluster(client, number_of_workers, start_time, timeout=120):
    print("Waiting for workers to connect", end="", flush=True)

    prev_count = 0
    while True:
        curr_count = len(client.scheduler_info()['workers'])
        if time.time() - start_time > timeout:
            print("\nTimed out. Canceling.")
            break

        # if no workers yet, print a dot (once) and continue
        if curr_count == 0:
            print(".", end="", flush=True)
            prev_count = 0
            time.sleep(0.3)
            continue

        if curr_count != prev_count:
            print(f"{curr_count} workers are connected.", flush=True)
            prev_count = curr_count

        if curr_count >= number_of_workers:
            break

        time.sleep(0.3)

def launch_job(data_path, save_path, number_of_workers, cluster_kwargs):

    cluster = SLURMCluster(**cluster_kwargs)
    cluster.scale(number_of_workers+1)
    client = Client(cluster)

    start_time = time.time()
    wait_for_cluster(client, number_of_workers, start_time, timeout=120)
    print(f"Client dashboard available at: {client.dashboard_link}")

    # Load the Zarr file with Dask
    dask_data = da.from_zarr(data_path, component='0/0')
    data_shape = dask_data.shape

    # Eliminate singleton dimensions, and rechunk the data.
    dask_data = dask_data.squeeze()
    dask_data = dask_data.rechunk((32, 64, 64))

    # Process the data
    high_pass_filtered = dask_data.map_overlap(
        ndimage.gaussian_filter, sigma=3, order=0, mode="nearest", depth=40)

    low_pass_filtered = dask_data.map_overlap(
        ndimage.gaussian_filter, sigma=10, order=0, mode="nearest", depth=40)

    dog_filtered = da.map_blocks(
        np.subtract, high_pass_filtered, low_pass_filtered)

    dog_filtered.to_zarr(save_path, overwrite=True)

    # Close the client and cluster
    client.close()
    cluster.close()
    print("Client and cluster closed.")
    print(f"Total time to compute: {time.time() - start_time}")


## Port Forwarding
Dask will let you know which port the HTTP server is operating on. For example, it will state:
`Hosting the HTTP server on port 44460 instead`. The address can be found from `client.dashboard_link`.

To forward it to your local machine, run the following ssh command in your Terminal.
`ssh -N -L 44460:localhost:44460 your-cluster-login@nucleus.biohpc.swmed.edu`

You can then access this at `http://localhost:44460/status`


In [8]:
number_of_workers = 4
launch_job(data_path, save_path, number_of_workers, cluster_kwargs)

Waiting for workers to connect................
3 workers are connected.

5 workers are connected.
Client dashboard available at: http://10.100.160.4:9000/status
Client and cluster closed.
Total time to compute: 276.55252408981323


## Optimization


In [9]:
number_of_workers = 8
launch_job(data_path, save_path, number_of_workers, cluster_kwargs)

Waiting for workers to connect................
5 workers are connected.

Timed out. Canceling.
Client dashboard available at: http://10.100.160.4:9000/status
Client and cluster closed.
Total time to compute: 335.36883425712585


## Optimization
With 8 workers, and 28 cores per worker, it took 336 seconds to compute. What gives?
That’s probably a sign of overhead—extra scheduler traffic, more contention for the shared filesystem, and greater serialization costs on each map_overlap boundary.