From dc70b618facfe8ea14b09af8ed1ab189e4da7fb2 Mon Sep 17 00:00:00 2001 From: Bernd Schuller Date: Fri, 27 Jan 2023 15:00:34 +0100 Subject: [PATCH] dask: simpler customization, connect dashboard; docs --- docs/source/dask.rst | 166 +++++++++++++++++++++++++++++++- docs/source/port_forwarding.rst | 6 +- pyunicore/dask.py | 136 +++++++++++++++++--------- pyunicore/uftp.py | 3 +- 4 files changed, 259 insertions(+), 52 deletions(-) diff --git a/docs/source/dask.rst b/docs/source/dask.rst index 2b297a6..3386b20 100644 --- a/docs/source/dask.rst +++ b/docs/source/dask.rst @@ -1,12 +1,12 @@ Dask integration ---------------- -PyUNICORE provides an implementation of a Dask Cluster, allowing to -run the Dask client on your local host (or in a Jupyter notebook in -the Cloud), and have the Dask scheduler and workers running remotely -on the HPC site. +PyUNICORE provides the ``UNICORECluster`` class, which is an implementation +of a Dask Cluster, allowing to run the Dask client on your local host (or in +a Jupyter notebook in the Cloud), and have the Dask scheduler and workers +running remotely on the HPC site. -A basic usage example: +Here is a basic usage example: .. code:: python @@ -38,3 +38,159 @@ A basic usage example: That's it! Now Dask will run its computations using the scheduler and workers started via UNICORE on the HPC site. + + +Configuration +~~~~~~~~~~~~~ + +When creating the ``UNICORECluster``, a number of parameters can be set via the constructor. +All parameters except for the submitter to be used are OPTIONAL. + +- `submitter`: this is either a Client object or an Allocation, which is used to submit new jobs +- `n_workers`: initial number of workers to launch +- `queue`: the batch queue to use +- `project`: the accounting project +- `threads`: worker option controlling the number of threads per worker +- `processes`: worker option controlling the number of worker processes per job (default: 1) +- `scheduler_job_desc`: base job description for launching the scheduler (default: None) +- `worker_job_desc`: base job description for launching a worker (default: None) +- `local_port`: which local port to use for the Dask client (default: 4322) +- `connect_dashboard`: if True, a second forwarding process will be lauched to allow a connection to the dashboard + (default: False) +- `local_dashboard_port`: which local port to use for the dashboard (default: 4323) +- `debug`: if True, print some debug info (default: False) +- `connection_timeout`: timeout in seconds while setting up the port forwarding (default: 120) + + +Customizing the scheduler and workers +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +By default, the Dask extension will launch the Dask components using server-side applications +called ``dask-scheduler`` and ``dask-worker``, which need to be defined in the UNICORE IDB. + +The job description will look like this: + +.. code:: json + + { + "ApplicationName": "dask-scheduler", + "Arguments": [ + "--port", "0", + "--scheduler-file", "./dask.json" + ], + "Resources": { + "Queue": "your_queue", + "Project": "your_project" + } + } + +If you want to customize this, you can pass in a basic job description when creating +the ``UNICORECluster`` object. + +The job descriptions need not contain all command-line arguments, the ``UNICORECluster`` +will add them as required. Also, the queue and project will be set if necessary. + + +For example + +.. code:: python + + # Custom job to start scheduler + + sched_jd = { + "Executable" : "conda run -n dask dask-scheduler", + "Resources": { + "Runtime": "2h" + }, + "Tags": ["dask", "testing"] + } + + # Custom job to start worker + + worker_jd = { + "Executable" : "srun --tasks=1 conda run -n dask dask-scheduler", + "Resources": { + "Nodes": "2" + } + } + + # Create the UNICORECluster instance + uc_cluster = uc_dask.UNICORECluster( + submitter, + queue = "batch", + project = "my-project", + scheduler_job_desc=sched_jd, + worker_job_desc=worker_jd + ) + + +Scaling +~~~~~~~ + +To control the number of worker processes and threads, the UNICORECluster has the scale() method, +as well as two properties that can be set from the constructor, or later at runtime + +The scale() method controls how many workers (or worker jobs when using "jobs=..." as argument) +are running. + +.. code:: python + + # Start two workers + uc_cluster.scale(2, wait_for_startup=True) + + # Or start two worker jobs with 4 workers per job + # and 128 threads per worker + uc_cluster.processes = 4 + uc_cluster.threads = 128 + uc_cluster.scale(jobs=2) + +The dashboard +~~~~~~~~~~~~~ + +By default a connection to the scheduler's dashboard is not possible. To allow connecting to +the dashboard, set ``connect_dashboard=True`` when creating the ``UNICORECluster``. +The dashboard will then be available at ``http://localhost:4323``, the port can be changed, +if necessary. + + +Using an allocation +~~~~~~~~~~~~~~~~~~~ + +To speed up the startup and scaling process, it is possible to pre-allocate a multinode batch job +(if the server side UNICORE supports this, i.e. runs UNICORE 9.1 and Slurm), and run the Dask +components in this allocation. + +.. code:: python + + import pyunicore.client as uc_client + import pyunicore.credentials as uc_credentials + import pyunicore.dask as uc_dask + + # Create a UNICORE client for accessing the HPC cluster + base_url = "https://localhost:8080/DEMO-SITE/rest/core" + credential = uc_credentials.UsernamePassword("demouser", "test123") + submitter = uc_client.Client(credential, base_url) + + # Allocate a 4-node job + allocation_jd = { + "Job type": "ALLOCATE", + + "Resources": { + "Runtime": "60m", + "Queue": "batch", + "Project": "myproject" + } + } + + allocation = submitter.new_job(allocation_jd) + allocation.wait_until_available() + + # Create the UNICORECluster instance using the allocation + + uc_cluster = uc_dask.UNICORECluster(allocation, debug=True) + + +Note that in this case your custom scheduler / worker job descriptions MUST use ``srun --tasks=1 ...`` +to make sure that exactly one scheduler / worker is started on one node. + +Also make sure to not lauch more jobs than you have nodes - otherwise the new jobs will stay "QUEUED". diff --git a/docs/source/port_forwarding.rst b/docs/source/port_forwarding.rst index 9fdb5ae..3e3aee8 100644 --- a/docs/source/port_forwarding.rst +++ b/docs/source/port_forwarding.rst @@ -7,8 +7,8 @@ This feature requires UNICORE 9.1.0 or later on the server side. You can use this feature in two ways - * in your own applications via the `pyunicore.client.Job` class. - * you can also open a tunnel from the command line using the 'pyunicore.forwarder' module + * in your own applications via the ``pyunicore.client.Job`` class. + * you can also open a tunnel from the command line using the ``pyunicore.forwarder`` module Here is an example for a command line tool invocation: @@ -22,7 +22,7 @@ Here is an example for a command line tool invocation: $JOB_URL/forward-port?port=REMOTE_PORT \ -Your application can now connect to "localhost:4322" but all traffic +Your application can now connect to ``localhost:4322`` but all traffic will be forwarded to port 8000 on the login node. See diff --git a/pyunicore/dask.py b/pyunicore/dask.py index 6100bd0..2564bd9 100644 --- a/pyunicore/dask.py +++ b/pyunicore/dask.py @@ -14,7 +14,29 @@ class UNICORECluster(Cluster): - """Deploy Dask on a HPC site via UNICORE""" + """Deploy Dask on a HPC site via UNICORE + + This class will launch a job for the Dask scheduler, and one or more jobs for + running Dask workers. It supports scale() method to adapt the number of workers. + + Args: + + submitter: this is either a Client object or an Allocation, which is used + to submit new jobs + n_workers: initial number of workers to launch + queue: the batch queue to use + project: the accounting project + threads: worker option controlling the number of threads per worker + processes: worker option controlling the number of worker processes per job + scheduler_job_desc: base job description for launching the scheduler + worker_job_desc: base job description for launching a worker + local_port: which local port to use for the Dask client (must be a free port) + connect_dashboard: if True, a second forwarding process will be launched + to allow a connection to the dashboard + local_dashboard_port: which local port to use for the dashboard (must be a free port) + debug: if True, print some debug info + connection_timeout: timeout in seconds while setting up the port forwarding + """ def __init__( self, @@ -24,11 +46,13 @@ def __init__( asynchronous=False, queue=None, project=None, - threads_per_process=None, - processes_per_job=1, - scheduler_options={}, - worker_options={}, + threads=None, + processes=1, + scheduler_job_desc={}, + worker_job_desc={}, local_port=4322, + connect_dashboard=False, + local_dashboard_port=4323, debug=False, connection_timeout=120, ): @@ -37,14 +61,17 @@ def __init__( self.status = Status.created self.debug = debug self.local_port = local_port + self.connect_dashboard = connect_dashboard + self.local_dashboard_port = local_dashboard_port self.queue = queue self.project = project - self.scheduler_options = scheduler_options - self.worker_options = worker_options - self.threads = threads_per_process - self.processes = processes_per_job + self.scheduler_job_desc = scheduler_job_desc + self.worker_job_desc = worker_job_desc + self.threads = threads + self.processes = processes self.worker_jobs = [] self.forwarding_process = None + self.db_forwarding_process = None try: self._start_scheduler() self._start_forwarder() @@ -66,27 +93,25 @@ def get_scheduler_job_description(self): """creates the JSON job description for starting the scheduler Returns JSON and optional array of input files """ - job_start_scheduler = self.scheduler_options.get( - "executable", {"ApplicationName": "dask-scheduler"} - ) - self.scheduler_port = self.scheduler_options.get("port", 0) - job_start_scheduler["Arguments"] = [ - "--port", - str(self.scheduler_port), - "--scheduler-file", - "./dask.json", - ] - additional_args = self.scheduler_options.get("additional_args", []) - for arg in additional_args: - job_start_scheduler["Arguments"].append(arg) - resources = self.scheduler_options.get("Resources", {}) + job = self.scheduler_job_desc + if job.get("ApplicationName") is None and job.get("Executable") is None: + job["ApplicationName"] = "dask-scheduler" + args = job.get("Arguments", []) + if "--port" not in args: + args.append("--port") + args.append("0") + if "--scheduler-file" not in args: + args.append("--scheduler-file") + args.append("./dask.json") + job["Arguments"] = args + resources = job.get("Resources", {}) if self.queue is not None: resources["Queue"] = self.queue if self.project is not None: resources["Project"] = self.project if len(resources) > 0: - job_start_scheduler["Resources"] = resources - return job_start_scheduler, [] + job["Resources"] = resources + return job, [] def _start_scheduler(self): """ @@ -94,17 +119,19 @@ def _start_scheduler(self): """ job_desc, inputs = self.get_scheduler_job_description() job = self.submitter.new_job(job_desc, inputs) - self.scheduler_job = job not self.debug or print("Submitted scheduler", job) not self.debug or print("Waiting for scheduler to start up...") job.poll(JobStatus.RUNNING) if JobStatus.FAILED == job.status: raise OSError("Launching scheduler failed") not self.debug or print("Scheduler is running.") + self.scheduler_job = job self.scheduler_host, self.scheduler_port = self._read_scheduler_address() def _read_scheduler_address(self): - """reads scheduler host/port from dask.json file in the scheduler's working directory""" + """reads scheduler host/port from dask.json file in the scheduler's working directory. + Also reads dashboard port, if needed. + """ not self.debug or print("Reading scheduler host/port...") wd = self.scheduler_job.working_dir while True: @@ -116,34 +143,39 @@ def _read_scheduler_address(self): time.sleep(2) dask_json = json.loads(json_file.raw().read()) _h, _p = urlparse(dask_json["address"]).netloc.split(":") + if self.connect_dashboard: + try: + self.dashboard_port = int(dask_json["services"]["dashboard"]) + except KeyError: + self.connect_dask = False return _h, int(_p) def get_worker_job_description(self): """creates the JSON job description for starting a worker Returns JSON and optional array of input files """ - job_start_worker = self.worker_options.get("executable", {"ApplicationName": "dask-worker"}) - job_start_worker["Arguments"] = [ - "--scheduler-file", - "../%s/dask.json" % self.scheduler_job.job_id, - ] + job = self.worker_job_desc + if job.get("ApplicationName") is None and job.get("Executable") is None: + job["ApplicationName"] = "dask-worker" + args = job.get("Arguments", []) + if "--scheduler-file" not in args: + args.append("--scheduler-file") + args.append("../%s/dask.json" % self.scheduler_job.job_id) if self.processes: - job_start_worker["Arguments"].append("--nworkers") - job_start_worker["Arguments"].append(str(self.processes)) + args.append("--nworkers") + args.append(str(self.processes)) if self.threads: - job_start_worker["Arguments"].append("--nthreads") - job_start_worker["Arguments"].append(str(self.threads)) - additional_args = self.worker_options.get("additional_args", []) - for arg in additional_args: - job_start_worker["Arguments"].append(arg) - resources = self.worker_options.get("Resources", {}) + args.append("--nthreads") + args.append(str(self.threads)) + job["Arguments"] = args + resources = job.get("Resources", {}) if self.queue is not None: resources["Queue"] = self.queue if self.project is not None: resources["Project"] = self.project if len(resources) > 0: - job_start_worker["Resources"] = resources - return job_start_worker, [] + job["Resources"] = resources + return job, [] def _submit_worker_job(self): """ @@ -188,6 +220,19 @@ def _start_forwarder(self): ctx = get_context("spawn") self.forwarding_process = ctx.Process(target=self.forwarder.run, args=[self.local_port]) self.forwarding_process.start() + if self.connect_dashboard: + self.db_forwarder = Forwarder( + tr, + endpoint, + service_port=self.dashboard_port, + service_host=self.scheduler_host, + login_node=None, + debug=self.debug, + ) + self.db_forwarding_process = ctx.Process( + target=self.db_forwarder.run, args=[self.local_dashboard_port] + ) + self.db_forwarding_process.start() def cleanup(self): if self.scheduler_job: @@ -196,6 +241,8 @@ def cleanup(self): worker.abort() if self.forwarding_process: self.forwarding_process.kill() + if self.db_forwarding_process: + self.db_forwarding_process.kill() def close(self): self.cleanup() @@ -203,4 +250,7 @@ def close(self): @property def dashboard_link(self): - return None + if self.connect_dashboard: + return "localhost:%s" % str(self.local_dashboard_port) + else: + return None diff --git a/pyunicore/uftp.py b/pyunicore/uftp.py index da91fe6..e1c4e7a 100644 --- a/pyunicore/uftp.py +++ b/pyunicore/uftp.py @@ -127,7 +127,8 @@ def set_time(self, mtime, path): raise OSError("Could not set time: " % reply) def close(self): - self.ftp.close() + if self.ftp is not None: + self.ftp.close() def get_write_socket(self, path, offset): path = self.normalize(path)