diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index 46d91056..d608b167 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -445,7 +445,7 @@ def stop_jobs(self, jobs): logger.debug("Stopping jobs: %s", jobs) if jobs: jobs = list(jobs) - self._call([self.cancel_command] + list(set(jobs))) + self._call(shlex.split(self.cancel_command) + list(set(jobs))) # if any of these jobs were pending, we should remove those now for job_id in jobs: diff --git a/dask_jobqueue/tests/test_sge.py b/dask_jobqueue/tests/test_sge.py index 8ce3b005..a6bba110 100644 --- a/dask_jobqueue/tests/test_sge.py +++ b/dask_jobqueue/tests/test_sge.py @@ -16,7 +16,6 @@ def test_basic(loop): with SGECluster( walltime="00:02:00", cores=8, processes=4, memory="2GB", loop=loop ) as cluster: - print(cluster.job_script()) with Client(cluster, loop=loop) as client: cluster.scale(2) @@ -103,3 +102,26 @@ def test_job_script(tmpdir): "#$ -j y", ]: assert each in job_script + + +@pytest.mark.env("sge") +def test_complex_cancel_command(loop): + with SGECluster( + walltime="00:02:00", cores=1, processes=1, memory="2GB", loop=loop + ) as cluster: + username = "root" + cluster.cancel_command = "qdel -u {}".format(username) + + cluster.scale(2) + + start = time() + while not cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT + + cluster.stop_all_jobs() + + start = time() + while cluster.running_jobs: + sleep(0.100) + assert time() < start + QUEUE_WAIT