diff --git a/dask_jobqueue/core.py b/dask_jobqueue/core.py index c5625327..c2f2b1b2 100644 --- a/dask_jobqueue/core.py +++ b/dask_jobqueue/core.py @@ -2,6 +2,7 @@ import logging import math +import os import re import shlex import subprocess @@ -151,6 +152,7 @@ def __init__(self, local_directory=None, extra=None, env_extra=None, + log_directory=None, walltime=None, threads=None, **kwargs @@ -183,6 +185,8 @@ def __init__(self, extra = dask.config.get('jobqueue.%s.extra' % self.scheduler_name) if env_extra is None: env_extra = dask.config.get('jobqueue.%s.env-extra' % self.scheduler_name) + if log_directory is None: + log_directory = dask.config.get('jobqueue.%s.log-directory' % self.scheduler_name) if dask.config.get('jobqueue.%s.threads', None): warnings.warn(threads_deprecation_message) @@ -241,6 +245,11 @@ def __init__(self, if extra is not None: self._command_template += extra + self.log_directory = log_directory + if self.log_directory is not None: + if not os.path.exists(self.log_directory): + os.makedirs(self.log_directory) + def __repr__(self): running_workers = sum(len(value) for value in self.running_jobs.values()) running_cores = running_workers * self.worker_threads diff --git a/dask_jobqueue/jobqueue.yaml b/dask_jobqueue/jobqueue.yaml index fbb960c3..0e93db88 100644 --- a/dask_jobqueue/jobqueue.yaml +++ b/dask_jobqueue/jobqueue.yaml @@ -19,6 +19,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + log-directory: null pbs: name: dask-worker @@ -40,6 +41,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + log-directory: null sge: name: dask-worker @@ -59,6 +61,7 @@ jobqueue: walltime: '00:30:00' extra: "" env-extra: [] + log-directory: null resource-spec: null @@ -83,6 +86,7 @@ jobqueue: job-cpu: null job-mem: null job-extra: {} + log-directory: null moab: name: dask-worker @@ -104,6 +108,7 @@ jobqueue: env-extra: [] resource-spec: null job-extra: [] + log-directory: null lsf: name: dask-worker @@ -126,3 +131,4 @@ jobqueue: ncpus: null mem: null job-extra: [] + log-directory: null diff --git a/dask_jobqueue/lsf.py b/dask_jobqueue/lsf.py index 7e2cc210..2e22325e 100644 --- a/dask_jobqueue/lsf.py +++ b/dask_jobqueue/lsf.py @@ -73,8 +73,11 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None # LSF header build if self.name is not None: header_lines.append('#BSUB -J %s' % self.name) - header_lines.append('#BSUB -e %s.err' % self.name) - header_lines.append('#BSUB -o %s.out' % self.name) + if self.log_directory is not None: + header_lines.append('#BSUB -e %s/%s-%%J.err' % + (self.log_directory, self.name or 'worker')) + header_lines.append('#BSUB -o %s/%s-%%J.out' % + (self.log_directory, self.name or 'worker')) if queue is not None: header_lines.append('#BSUB -q %s' % queue) if project is not None: diff --git a/dask_jobqueue/pbs.py b/dask_jobqueue/pbs.py index 083b2c01..bc64702d 100644 --- a/dask_jobqueue/pbs.py +++ b/dask_jobqueue/pbs.py @@ -93,6 +93,9 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, header_lines.append('#PBS -l %s' % resource_spec) if walltime is not None: header_lines.append('#PBS -l walltime=%s' % walltime) + if self.log_directory is not None: + header_lines.append('#PBS -e %s/' % self.log_directory) + header_lines.append('#PBS -o %s/' % self.log_directory) header_lines.extend(['#PBS %s' % arg for arg in job_extra]) header_lines.append('JOB_ID=${PBS_JOBID%.*}') diff --git a/dask_jobqueue/sge.py b/dask_jobqueue/sge.py index 1346b972..cc7164a4 100644 --- a/dask_jobqueue/sge.py +++ b/dask_jobqueue/sge.py @@ -66,6 +66,9 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, header_lines.append('#$ -l %(resource_spec)s') if walltime is not None: header_lines.append('#$ -l h_rt=%(walltime)s') + if self.log_directory is not None: + header_lines.append('#$ -e %(log_directory)s/') + header_lines.append('#$ -o %(log_directory)s/') header_lines.extend(['#$ -cwd', '#$ -j y']) header_template = '\n'.join(header_lines) @@ -74,7 +77,8 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, 'project': project, 'processes': self.worker_processes, 'walltime': walltime, - 'resource_spec': resource_spec} + 'resource_spec': resource_spec, + 'log_directory': self.log_directory} self.job_header = header_template % config logger.debug("Job script: \n %s" % self.job_script()) diff --git a/dask_jobqueue/slurm.py b/dask_jobqueue/slurm.py index 29dc2352..7e0dcb90 100644 --- a/dask_jobqueue/slurm.py +++ b/dask_jobqueue/slurm.py @@ -73,8 +73,11 @@ def __init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_me # SLURM header build if self.name is not None: header_lines.append('#SBATCH -J %s' % self.name) - header_lines.append('#SBATCH -e %s.err' % self.name) - header_lines.append('#SBATCH -o %s.out' % self.name) + if self.log_directory is not None: + header_lines.append('#SBATCH -e %s/%s-%%J.err' % + (self.log_directory, self.name or 'worker')) + header_lines.append('#SBATCH -o %s/%s-%%J.out' % + (self.log_directory, self.name or 'worker')) if queue is not None: header_lines.append('#SBATCH -p %s' % queue) if project is not None: diff --git a/dask_jobqueue/tests/test_jobqueue_core.py b/dask_jobqueue/tests/test_jobqueue_core.py index bc0c9260..e76c5bfa 100644 --- a/dask_jobqueue/tests/test_jobqueue_core.py +++ b/dask_jobqueue/tests/test_jobqueue_core.py @@ -1,6 +1,8 @@ from __future__ import absolute_import, division, print_function +import os import pytest +import shutil import socket from dask_jobqueue import (JobQueueCluster, PBSCluster, MoabCluster, @@ -79,3 +81,13 @@ def test_job_id_error_handling(Cluster): return_string = 'Job <12345> submited to .' cluster.job_id_regexp = r'(\d+)' cluster._job_id_from_submit_output(return_string) + + +def test_log_directory(tmpdir): + shutil.rmtree(tmpdir.strpath, ignore_errors=True) + with PBSCluster(cores=1, memory='1GB'): + assert not os.path.exists(tmpdir.strpath) + + with PBSCluster(cores=1, memory='1GB', + log_directory=tmpdir.strpath): + assert os.path.exists(tmpdir.strpath)