In [1]:
%load_ext autoreload
%autoreload 2

# Make Dask Cluster

In [2]:
from dask_jobqueue import SLURMCluster

job_script_prologue = [
    'export PATH="/project/project_462000451/enchanted_container_lumi3/bin:$PATH"',
    'cd /project/project_462000451/enchanted-surrogates/',
    'export PYTHONPATH=$PYTHONPATH:/project/project_462000451/enchanted-surrogates/src', # NB: to use the enchanted-surrogate library
    'export PYTHONPATH=$PYTHONPATH:/project/project_462000451/DEEPlasma/GENE_ML/gene_ml', # NB: to use the enchanted-surrogate library
    'export PYTHONPATH=$PYTHONPATH:/project/project_462000451/DEEPlasma/', # NB: to use the enchanted-surrogate library
    'source /scratch/project_462000451/daniel/daniel_sprint/bin/activate'
]

#SBATCH -t 00:30:00                # wallclock limit
#SBATCH -N 1                       # total number of nodes, 2 CPUs with 64 rank each
#SBATCH --ntasks=128      # 64 per CPU (i.e. 128 per node). Additional 2 hyperthreads disabled
#SBATCH --mem=0                    # Allocate all the memory on each node
#SBATCH -p standard                # all options see: scontrol show partition
#SBATCH -A project_462000451

slurm_options = ['-t 00:30:00', '-N 1', '-p standard', '-A project_462000451', '--ntasks=128']
cluster = SLURMCluster(
    # queue='standard',
    # account="project_462000451",
    cores=1, #aka cpus-per-task
    memory="200GB",
    # walltime = "00:10:00",
    interface= "nmn0",
    processes = 1, #keep this as 1
    job_script_prologue=job_script_prologue,
    job_extra_directives = slurm_options
)
n_jobs = 4
cluster.scale(n_jobs)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34707 instead


In [3]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=187G
#SBATCH -t 00:30:00
#SBATCH -t 00:30:00
#SBATCH -N 1
#SBATCH -p standard
#SBATCH -A project_462000451
#SBATCH --ntasks=128
export PATH="/project/project_462000451/enchanted_container_lumi3/bin:$PATH"
cd /project/project_462000451/enchanted-surrogates/
export PYTHONPATH=$PYTHONPATH:/project/project_462000451/enchanted-surrogates/src
export PYTHONPATH=$PYTHONPATH:/project/project_462000451/DEEPlasma/GENE_ML/gene_ml
export PYTHONPATH=$PYTHONPATH:/project/project_462000451/DEEPlasma/
source /scratch/project_462000451/daniel/daniel_sprint/bin/activate
/project/project_462000451/enchanted_container_lumi3/bin/python -m distributed.cli.dask_worker tcp://10.252.1.81:36329 --name dummy-name --nthreads 1 --memory-limit 186.26GiB --nanny --death-timeout 60 --interface nmn0



In [4]:
from dask.distributed import Client
client = Client(cluster)

In [5]:
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: http://10.252.1.81:34707/status,

0,1
Dashboard: http://10.252.1.81:34707/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.252.1.81:36329,Workers: 0
Dashboard: http://10.252.1.81:34707/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [6]:
print(client)

<Client: 'tcp://10.252.1.81:36329' processes=0 threads=0, memory=0 B>


In [None]:
from dask.distributed import Client

# Connect to the Dask cluster
# client = Client('tcp://scheduler-address:port')

# Print client information
print(client)

# Get detailed information about workers
workers_info = client.scheduler_info()['workers']
for worker, info in workers_info.items():
    print(f"Worker: {worker}")
    print(f"  Memory: {info['memory_limit'] / 1e9:.2f} GB")
    print(f"  Cores: {info['nthreads']}")
    print(f"  Host: {info['host']}")
    print(f"  Local Directory: {info['local_directory']}")
    print('INFO KEYS',info.keys())
    print('INFO',info)
    print()

<Client: 'tcp://10.252.1.81:36329' processes=0 threads=0, memory=0 B>


# check access to file system

In [None]:
from dask.distributed import print, Client, as_completed, wait
import subprocess
import os
def ls():
    print('checking file system connection')
    subprocess.run('ls /project/project_462000451/DEEPlasma')
    print('ls', os.system('ls /project/project_462000451/DEEPlasma'))
 
new_future = client.submit(ls)
res = new_future.result()

# quick GENE runner

In [8]:
from GENE_ML.gene_ml.runners.GENErunner import GENErunner
from GENE_ML.gene_ml.parsers.GENEparser import GENE_scan_parser
from config import Config
import os
config = Config(local=True)

parser = GENE_scan_parser(config)
remote_save_name = 'gene_dask_test_really'
remote_save_dir = os.path.join(config.remote_save_base_dir,remote_save_name)
single_run_timelim = 300
single_run_simtimelim = 300
runner = GENErunner(parser, config, remote_save_dir, single_run_timelim=single_run_timelim, single_run_simtimelim=single_run_simtimelim, no_sbatch_run=True)

In [9]:
print(remote_save_dir)

/scratch/project_462000451/gene_out/gene_auto_97781/gene_dask_test_really


In [10]:
from GENE_ML.gene_ml.samplers.grid import Grid

ky = ['box-kymin']
bounds = [(0.5,0.75)]
num_samples = 4
sampler = Grid(parameters=ky, num_samples=num_samples, bounds=bounds)

# convert to form needed for my scanscript setup
samples = [{'box-kymin':[v[0]]} for v in sampler.samples_np]
for sa in samples:
    print(sa)

{'box-kymin': [0.5]}
{'box-kymin': [0.5833333333333334]}
{'box-kymin': [0.6666666666666666]}
{'box-kymin': [0.75]}


In [11]:
# create dask futures
from dask.distributed import Client, as_completed, wait

futures = []
for i, sa in enumerate(samples):
    print(sa)
    run_id = f'gene_dask_test_{i}'
    # runner.code_run(sa, run_id)
    new_future = client.submit(
        runner.code_run, sa, run_id
    )
    futures.append(new_future)



{'box-kymin': [0.5]}
{'box-kymin': [0.5833333333333334]}
{'box-kymin': [0.6666666666666666]}
{'box-kymin': [0.75]}


In [12]:
seq = as_completed(futures)
for future in seq:
    res = future.result()