Skip to content

Commit

Permalink
Merge pull request #417 from ReactionMechanismGenerator/PBS
Browse files Browse the repository at this point in the history
Adding support for PBS cluster software
  • Loading branch information
alongd committed Oct 2, 2020
2 parents fba1e60 + 6d802af commit d0ef6a8
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 73 deletions.
47 changes: 16 additions & 31 deletions arc/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1374,11 +1374,13 @@ def determine_job_status(self):
def _get_additional_job_info(self):
"""
Download the additional information of stdout and stderr from the server.
stdout and stderr are named out.txt and err.txt respectively
submission script in submit.py should contain -o and -e flags.
"""
lines1, lines2 = list(), list()
content = ''
cluster_soft = servers[self.server]['cluster_soft'].lower()
if cluster_soft in ['oge', 'sge']:
if cluster_soft in ['oge', 'sge', 'slurm', 'pbs']:
local_file_path1 = os.path.join(self.local_path, 'out.txt')
local_file_path2 = os.path.join(self.local_path, 'err.txt')
if self.server != 'local':
Expand All @@ -1388,13 +1390,17 @@ def _get_additional_job_info(self):
ssh.download_file(remote_file_path=remote_file_path,
local_file_path=local_file_path1)
except (TypeError, IOError) as e:
logger.warning(f'Got the following error when trying to download out.txt for {self.job_name}:')
logger.warning(f'Got the following error when trying to download out.txt for {self.job_name}:'
f'Please check that the submission script contains a -o flag '
f'with stdout named out.txt (e.g., "#SBATCH -o out.txt").')
logger.warning(e)
remote_file_path = os.path.join(self.remote_path, 'err.txt')
try:
ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path2)
except (TypeError, IOError) as e:
logger.warning(f'Got the following error when trying to download err.txt for {self.job_name}:')
logger.warning(f'Got the following error when trying to download err.txt for {self.job_name}:'
f'Please check that the submission script contains a -e flag '
f'with stdout named err.txt (e.g., "#SBATCH -o err.txt").')
logger.warning(e)
if os.path.isfile(local_file_path1):
with open(local_file_path1, 'r') as f:
Expand All @@ -1405,32 +1411,10 @@ def _get_additional_job_info(self):
content += ''.join([line for line in lines1])
content += '\n'
content += ''.join([line for line in lines2])
elif cluster_soft == 'slurm':
else:
if self.server != 'local':
with SSHClient(self.server) as ssh:
response = ssh.list_dir(remote_path=self.remote_path)
else:
response = execute_command('ls -alF {0}'.format(self.local_path))
files = list()
for line in response[0]:
files.append(line.split()[-1])
for file_name in files:
if 'slurm' in file_name and '.out' in file_name:
local_file_path = os.path.join(self.local_path, file_name)
if self.server != 'local':
remote_file_path = os.path.join(self.remote_path, file_name)
try:
with SSHClient(self.server) as ssh:
ssh.download_file(remote_file_path=remote_file_path,
local_file_path=local_file_path)
except (TypeError, IOError) as e:
logger.warning(f'Got the following error when trying to download {file_name} '
f'for {self.job_name}: {e}')
if os.path.isfile(local_file_path):
with open(local_file_path, 'r') as f:
lines1 = f.readlines()
content += ''.join([line for line in lines1])
content += '\n'
raise ValueError(f'Unrecognized cluster software: {cluster_soft}')

return content

def _check_job_server_status(self):
Expand Down Expand Up @@ -1525,12 +1509,13 @@ def set_cpu_and_mem(self):
# determine amount of memory in submit script based on cluster job scheduling system
cluster_software = servers[self.server].get('cluster_soft').lower()
if cluster_software in ['oge', 'sge']:
# In SGE, `-l h_vmem=5000M` specify the amount of maximum memory required per cpu (all cores) to be 5000 MB.
# In SGE, `-l h_vmem=5000M` specifies the amount of maximum memory required per cpu (all cores) to be 5000 MB.
self.submit_script_memory = math.ceil(total_submit_script_memory) # MB
elif cluster_software in ['slurm']:
# In Slurm, `#SBATCH --mem-per-cpu={2000}` specify the amount of memory required per cpu core to be 2000 MB.
# In Slurm, `#SBATCH --mem-per-cpu={2000}` specifies the amount of memory required per cpu core to be 2000 MB.
self.submit_script_memory = math.ceil(total_submit_script_memory / self.cpu_cores) # MB
elif cluster_software in ['pbs']:
self.submit_script_memory = math.ceil(total_submit_script_memory / self.cpu_cores) # MB

# determine amount of memory in job input file based on ESS
if self.software.lower() in ['molpro', 'terachem']:
# Molpro's and TeraChem's memory is per cpu core and in MW (mega word; 1 MW ~= 8 MB; 1 GB = 128 MW)
Expand Down
38 changes: 28 additions & 10 deletions arc/job/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ def check_job_status(job_id):
14428 debug xq1371m2 user_name R 50-04:04:46 1 node06
PBS (taken from zeldo.dow.com)::
Req'd Req'd Elap
Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time
----------------------- ----------- -------- ---------------- ------ ----- ------ --------- --------- - ---------
2016614.zeldo.local u780444 workq scan.pbs 75380 1 10 -- 730:00:00 R 00:00:20
2016616.zeldo.local u780444 workq scan.pbs 75380 1 10 -- 730:00:00 R 00:00:20
"""
server = 'local'
cmd = check_status_command[servers[server]['cluster_soft']] + ' -u $USER'
Expand Down Expand Up @@ -153,9 +160,13 @@ def check_running_jobs_ids():
cmd = check_status_command[servers['local']['cluster_soft']] + ' -u $USER'
stdout = execute_command(cmd)[0]
for i, status_line in enumerate(stdout):
if (servers['local']['cluster_soft'].lower() == 'slurm' and i > 0)\
or (servers['local']['cluster_soft'].lower() == 'oge' and i > 1):
if servers['local']['cluster_soft'].lower() == 'slurm' and i > 0:
running_jobs_ids.append(int(status_line.split()[0]))
elif servers['local']['cluster_soft'].lower() == 'oge' and i > 1:
running_jobs_ids.append(int(status_line.split()[0]))
elif servers['local']['cluster_soft'].lower() == 'pbs' and i > 4:
running_jobs_ids.append(int(status_line.split('.')[0]))

return running_jobs_ids


Expand All @@ -169,14 +180,17 @@ def submit_job(path):
cmd = 'cd ' + path + '; ' + submit_command[servers['local']['cluster_soft']] + ' '\
+ submit_filename[servers['local']['cluster_soft']]
stdout = execute_command(cmd)[0]
if 'submitted' in stdout[0].lower():
if servers['local']['cluster_soft'].lower() in ['oge', 'sge'] and 'submitted' in stdout[0].lower():
job_id = int(stdout[0].split()[2])
job_status = 'running'
elif servers['local']['cluster_soft'].lower() == 'slurm' and 'submitted' in stdout[0].lower():
job_id = int(stdout[0].split()[3])
job_status = 'running'
if servers['local']['cluster_soft'].lower() == 'oge':
job_id = int(stdout[0].split()[2])
elif servers['local']['cluster_soft'].lower() == 'slurm':
job_id = int(stdout[0].split()[3])
else:
raise ValueError('Unrecognized cluster software {0}'.format(servers['local']['cluster_soft']))
elif servers['local']['cluster_soft'].lower() == 'pbs':
job_id = int(stdout[0].split('.')[0])
job_status = 'running'
else:
raise ValueError('Unrecognized cluster software {0}'.format(servers['local']['cluster_soft']))
return job_status, job_id


Expand Down Expand Up @@ -231,7 +245,11 @@ def delete_all_local_arc_jobs(jobs: Optional[List[Union[str, int]]] = None):
server_job_id = status_line.split()[0]
delete_job(server_job_id)
print(f'deleted job {job_id} ({server_job_id} on server)')
elif servers[server]['cluster_soft'].lower() == 'oge':
elif servers[server]['cluster_soft'].lower() == 'pbs':
server_job_id = status_line.split()[0]
delete_job(server_job_id)
print(f'deleted job {job_id} ({server_job_id} on server)')
elif servers[server]['cluster_soft'].lower() in ['oge', 'sge']:
delete_job(job_id)
print(f'deleted job {job_id}')
print('\ndone.')
62 changes: 37 additions & 25 deletions arc/job/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,12 @@ def check_running_jobs_ids(self) -> list:
cmd = check_status_command[servers[self.server]['cluster_soft']] + ' -u $USER'
stdout = self._send_command_to_server(cmd)[0]
for i, status_line in enumerate(stdout):
if (servers[self.server]['cluster_soft'].lower() == 'slurm' and i > 0)\
or (servers[self.server]['cluster_soft'].lower() == 'oge' and i > 1):
if servers['local']['cluster_soft'].lower() == 'slurm' and i > 0:
running_jobs_ids.append(int(status_line.split()[0]))
elif servers['local']['cluster_soft'].lower() in ['oge', 'sge'] and i > 1:
running_jobs_ids.append(int(status_line.split()[0]))
elif servers['local']['cluster_soft'].lower() == 'pbs' and i > 4:
running_jobs_ids.append(int(status_line.split('.')[0]))
return running_jobs_ids

def submit_job(self, remote_path: str) -> Tuple[str, int]:
Expand Down Expand Up @@ -305,14 +308,17 @@ def submit_job(self, remote_path: str) -> Tuple[str, int]:
if 'Requested node configuration is not available' in line:
logger.warning(f'User may be requesting more resources than are available. Please check server '
f'settings, such as cpus and memory, in ARC/arc/settings/settings.py')
elif 'submitted' in stdout[0].lower():
elif servers['local']['cluster_soft'].lower() in ['oge', 'sge'] and 'submitted' in stdout[0].lower():
job_id = int(stdout[0].split()[2])
job_status = 'running'
if cluster_soft.lower() == 'oge':
job_id = int(stdout[0].split()[2])
elif cluster_soft.lower() == 'slurm':
job_id = int(stdout[0].split()[3])
else:
raise ValueError(f'Unrecognized cluster software: {cluster_soft}')
elif servers['local']['cluster_soft'].lower() == 'slurm' and 'submitted' in stdout[0].lower():
job_id = int(stdout[0].split()[3])
job_status = 'running'
elif servers['local']['cluster_soft'].lower() == 'pbs':
job_id = int(stdout[0].split('.')[0])
job_status = 'running'
else:
raise ValueError(f'Unrecognized cluster software: {cluster_soft}')
return job_status, job_id

def connect(self) -> None:
Expand Down Expand Up @@ -423,7 +429,7 @@ def list_available_nodes(self) -> list:
cluster_soft = servers[self.server]['cluster_soft']
cmd = list_available_nodes_command[cluster_soft]
stdout = self._send_command_to_server(command=cmd)[0]
if cluster_soft.lower() == 'oge':
if cluster_soft.lower() in ['oge', 'sge']:
# Stdout line example:
# long1@node01.cluster BIP 0/0/8 -NA- lx24-amd64 aAdu
nodes = [line.split()[0].split('@')[1]
Expand All @@ -433,6 +439,8 @@ def list_available_nodes(self) -> list:
# node01 alloc 1.00 none
nodes = [line.split()[0] for line in stdout
if line.split()[1] in ['mix', 'alloc', 'idle']]
elif cluster_soft.lower() == 'pbs':
ValueError(f'cluster software : {cluster_soft} not yet implemented please modify job/ssh.py/list_available_nodes')
return nodes

def change_mode(self,
Expand Down Expand Up @@ -525,22 +533,26 @@ def check_job_status_in_stdout(job_id: int,
break
else:
return 'done'
status = status_line.split()[4]
if status.lower() in ['r', 'qw', 't']:
return 'running'
if servers[server]['cluster_soft'].lower() == 'slurm':
status = status_line.split()[4]
if status.lower() in ['r', 'qw', 't', 'cg']:
return 'running'
elif status.lower() in ['bf', 'ca', 'f', 'nf', 'st', 'oom']:
return 'errored'
elif servers[server]['cluster_soft'].lower() == 'pbs':
status = status_line.split()[-2]
if status.lower() in ['r', 'q', 'c', 'e', 'w']:
return 'running'
elif status.lower() in ['h', 's']:
return 'errored'
elif servers[server]['cluster_soft'].lower() in ['oge', 'sge']:
status = status_line.split()[4]
if status.lower() in ['r', 'qw', 't']:
return 'running'
elif status.lower() in ['e',]:
return 'errored'
else:
if servers[server]['cluster_soft'].lower() == 'oge':
if '.cluster' in status_line:
try:
return 'errored on node ' + status_line.split()[-1].split('@')[1].split('.')[0][-2:]
except IndexError:
return 'errored'
else:
return 'errored'
elif servers[server]['cluster_soft'].lower() == 'slurm':
return 'errored on node ' + status_line.split()[-1][-2:]
else:
raise ValueError(f'Unknown cluster software {servers[server]["cluster_soft"]}')
raise ValueError(f'Unknown cluster software {servers[server]["cluster_soft"]}')


def delete_all_arc_jobs(server_list: list,
Expand Down
24 changes: 18 additions & 6 deletions arc/settings/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,34 @@
}

check_status_command = {'OGE': 'export SGE_ROOT=/opt/sge; /opt/sge/bin/lx24-amd64/qstat',
'Slurm': '/usr/bin/squeue'}
'Slurm': '/usr/bin/squeue',
'PBS': '/usr/local/bin/qstat',
}

submit_command = {'OGE': 'export SGE_ROOT=/opt/sge; /opt/sge/bin/lx24-amd64/qsub',
'Slurm': '/usr/bin/sbatch'}
'Slurm': '/usr/bin/sbatch',
'PBS': '/usr/local/bin/qsub',
}

delete_command = {'OGE': 'export SGE_ROOT=/opt/sge; /opt/sge/bin/lx24-amd64/qdel',
'Slurm': '/usr/bin/scancel'}
'Slurm': '/usr/bin/scancel',
'PBS': '/usr/local/bin/qdel',
}

list_available_nodes_command = {'OGE': 'export SGE_ROOT=/opt/sge; /opt/sge/bin/lx24-amd64/qstat -f | grep "/8 " | grep "long" | grep -v "8/8"| grep -v "aAu"',
'Slurm': 'sinfo -o "%n %t %O %E"'}
'Slurm': 'sinfo -o "%n %t %O %E"',
'PBS': 'pbsnodes',
}

submit_filename = {'OGE': 'submit.sh',
'Slurm': 'submit.sl'}
'Slurm': 'submit.sl',
'PBS': 'submit.sh',
}

t_max_format = {'OGE': 'hours',
'Slurm': 'days'}
'Slurm': 'days',
'PBS': 'hours',
}

input_filename = {'gaussian': 'input.gjf',
'molpro': 'input.in',
Expand Down

0 comments on commit d0ef6a8

Please sign in to comment.