Feature-request: Allow two new optional arguments in SSHCluster() command:
spec_file parameter
- corresponding
.yml file for spec_file
to allow creation of a large # of workers
Furthermore, use of spec for Scheduler, and spec_file option for Worker class should also be allowed.
Presently, 2022.2.x version of dask & distributed uses scheduler_options and worker_options input arguments as dictionaries for constructing shell commands passed as arguments to dask_spec.py for creating respective Scheduler and Worker class instances. I'm not sure about the character limits for UNIX, but in trying it for Windows-based environment, with just 4 worker options parameters in the dictionary, and n_workers set to 20, command gave out the error too many characters in command line or something to that effect. Unfortunately, I did not get a chance to capture it but it should be easily reproducible. In looking through ssh.py I found the two .join commands used prior to .create_process(), for Scheduler & Worker respectively, using the option --spec as input to dask_spec.py. dask_spec.py inturn also allows another argument --spec_file. This argument seems to do a yaml safe_load to open the file, create python dictionary objects and instantiate respective classes. I tested it and found it working for windows 2019 server environment at least.
import os
spec_path=os.path.join("C:" + os.sep, "Users", "maulik","code","config","SSHWorker.yml")
cmd = " ".join(
[
map_T_drive_cmd, " && ",
set_env,
self.remote_python,
"-m",
"distributed.cli.dask_spec",
self.scheduler,
"--spec-file",
spec_path
]
)
where SSHWorker.yml looks like so:
0:
cls: distributed.Nanny
opts:
dashboard: true
local_directory: dask_logs
memory_limit: 2GB
nthreads: 1
port: 13000:13999
worker_port: 12000:12999
1:
cls: distributed.Nanny
opts:
dashboard: true
local_directory: dask_logs
memory_limit: 2GB
nthreads: 1
port: 13000:13999
worker_port: 12000:12999
2:
cls: distributed.Nanny
opts:
dashboard: true
local_directory: dask_logs
memory_limit: 2GB
nthreads: 1
port: 13000:13999
worker_port: 12000:12999
to avoid manual creation of these entries, I wrote a small function like so:
"""
Maulik 03/03/22
In this release of DASK (2022.2.0), ssh.py generates configuration command per worker. It then passes these
command strings via CLI to another process (dask_spec.py). In Windows (and also in UNIX), there is a finite limit
to the number of characters which can be read in via STDIN. Therefore, I have found an alternative to load the
worker configuration using .yml. But to do so, the configuration must be dynamically generated
"""
ymlOptionsWorker = {
i: {
"cls": "distributed.Nanny",
"opts": {
"dashboard": True,
"nthreads": Dask_Worker_Threads,
"memory_limit": Dask_Worker_Memory_Limit,
"worker_port": Dask_Worker_Port,
"port": Dask_Worker_Nanny_Port,
"local_directory": Dask_Worker_Local_Directory
},
}
for i in range(Dask_Worker_Procs)
}
SSHWORKER_BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'config'))
SSHWORKER_FILE_NAME = os.path.join(SSHWORKER_BASE_DIR, Dask_Worker_Config_File_Name)
if (os.path.exists(SSHWORKER_FILE_NAME)):
with open(SSHWORKER_FILE_NAME, 'w') as file:
yaml.dump(ymlOptionsWorker, file)
Something similar for Scheduler. ssh.py entry:
import os
py_path=os.path.join("C:" + os.sep, "Users", "maulik","code","config","SSHScheduler.yml")
cmd = " ".join(
[
# map_S_drive_cmd, " && ",
set_env,
self.remote_python,
"-m",
"distributed.cli.dask_spec",
"--spec-file",
py_path,
]
)
self.proc = await self.connection.create_process(cmd)
and the driver file function has:
"""
Maulik 03/03/22
In this release of DASK (2022.2.0), ssh.py generates configuration command for scheduler. It then passes these
command strings via CLI to another process (dask_spec.py). In Windows (and also in UNIX), there is a finite limit
to the number of characters which can be read in via STDIN. It is a per- worker configuration. Therefore, I have found an alternative
to load the scheduler configuration using .yml. But to do so, the worker configuration must be dynamically generated as it
not realistic to assume for us to modify by hand, the dictionary object, every time the worker configuration needs
tweaking.
"""
ymlOptionsScheduler={
"cls": "distributed.Scheduler",
"opts": {
"port": DaskSchedulerPort,
"dashboard": True,
"dashboard_address": Dask_Scheduler_Worker_IPs[0] + ":" + str(DaskDashboardPort)
}
}
SSH_SCHEDULER_BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'config'))
SSH_SCHEDULER__FILE_NAME = os.path.join(SSH_SCHEDULER_BASE_DIR, Dask_Scheduler_Config_File_Name)
if (os.path.exists(SSH_SCHEDULER__FILE_NAME)):
with open(SSH_SCHEDULER__FILE_NAME, 'w') as file:
yaml.dump(ymlOptionsScheduler, file)
resulting SSHScheduler.yml looks like so:
cls: distributed.Scheduler
opts:
dashboard: true
dashboard_address: localhost:8787
port: 8786
Doing so allowed creation of e.g. upto 120 workers on one of the VM I have w/ 128 cores (256 HTs). Whereas in present Generally Available code release (2022.2.x), I was not even able to create 15 workers. Hopefully, the above is self-explanatory. If not, I'm happy to provide additional explanation
Feature-request: Allow two new optional arguments in SSHCluster() command:
spec_fileparameter.ymlfile forspec_fileto allow creation of a large # of workers
Furthermore, use of
specfor Scheduler, andspec_fileoption for Worker class should also be allowed.Presently, 2022.2.x version of dask & distributed uses
scheduler_optionsandworker_optionsinput arguments as dictionaries for constructing shell commands passed as arguments todask_spec.pyfor creating respective Scheduler and Worker class instances. I'm not sure about the character limits for UNIX, but in trying it for Windows-based environment, with just 4 worker options parameters in the dictionary, andn_workersset to 20, command gave out the errortoo many characters in command lineor something to that effect. Unfortunately, I did not get a chance to capture it but it should be easily reproducible. In looking throughssh.pyI found the two.joincommands used prior to.create_process(), for Scheduler & Worker respectively, using the option--specas input todask_spec.py.dask_spec.pyinturn also allows another argument--spec_file. This argument seems to do a yaml safe_load to open the file, create python dictionary objects and instantiate respective classes. I tested it and found it working for windows 2019 server environment at least.where SSHWorker.yml looks like so:
to avoid manual creation of these entries, I wrote a small function like so:
Something similar for Scheduler.
ssh.pyentry:and the driver file function has:
resulting
SSHScheduler.ymllooks like so:Doing so allowed creation of e.g. upto 120 workers on one of the VM I have w/ 128 cores (256 HTs). Whereas in present Generally Available code release (2022.2.x), I was not even able to create 15 workers. Hopefully, the above is self-explanatory. If not, I'm happy to provide additional explanation