Skip to content

Commit

Permalink
Merge pull request #128 from ReactionMechanismGenerator/local2
Browse files Browse the repository at this point in the history
Run ARC on a server
  • Loading branch information
alongd committed May 8, 2019
2 parents f72a0a8 + cd8334d commit 1bb2739
Show file tree
Hide file tree
Showing 18 changed files with 688 additions and 346 deletions.
1 change: 1 addition & 0 deletions arc/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

import job
import ssh
import local
import inputs
import submit
298 changes: 175 additions & 123 deletions arc/job/job.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions arc/job/jobTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def setUpClass(cls):
"""
cls.maxDiff = None
cls.ess_settings = {'gaussian': ['server1', 'server2'], 'molpro': ['server2'],
'qchem': ['server1'], 'onedmin': ['server1'], 'ssh': False}
'qchem': ['server1'], 'onedmin': ['server1']}
cls.job1 = Job(project='project_test', ess_settings=cls.ess_settings, species_name='tst_spc',
xyz='C 0.0 0.0 0.0', job_type='opt', level_of_theory='b3lyp/6-31+g(d)', multiplicity=1,
testing=True, project_directory=os.path.join(arc_path, 'Projects', 'project_test'),
Expand Down Expand Up @@ -61,7 +61,7 @@ def test_as_dict(self):
'pivots': [],
'project_directory': os.path.join(arc_path, 'Projects', 'project_test'),
'scan': '',
'server': None,
'server': 'server1',
'shift': '',
'max_job_time': 120,
'comments': '',
Expand Down
115 changes: 115 additions & 0 deletions arc/job/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python
# encoding: utf-8

"""
A module for running jobs on the local machine
When transitioning to Python 3, use subprocess.run(), https://docs.python.org/3/library/subprocess.html#subprocess.run
"""

from __future__ import (absolute_import, division, print_function, unicode_literals)
import subprocess
import os
import shutil
import datetime

from arc.job.ssh import check_job_status_in_stdout
from arc.settings import servers, check_status_command, submit_command, submit_filename, delete_command, output_filename

##################################################################


def execute_command(command, shell=True):
"""
Execute a command. `command` is an array of string commands to send.
If path is not an empty string, the command will be executed in the directory path it points to.
Returns lists of stdin, stdout, stderr corresponding to the commands sent.
`shell` specifies whether the command should be executed using bash instead of Python
"""
if not isinstance(command, list) and not shell:
command = [command]
stdout = subprocess.check_output(command, shell=shell)
return stdout.splitlines(), ''


def check_job_status(job_id):
"""
Possible statuses: `before_submission`, `running`, `errored on node xx`, `done`
Status line formats:
pharos: '540420 0.45326 xq1340b user_name r 10/26/2018 11:08:30 long1@node18.cluster'
rmg: '14428 debug xq1371m2 user_name R 50-04:04:46 1 node06'
"""
server = 'local'
cmd = check_status_command[servers[server]['cluster_soft']] + ' -u ' + servers[server]['un']
stdout = execute_command(cmd)[0]
return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=server)


def delete_job(job_id):
"""
Deletes a running job
"""
cmd = delete_command[servers['local']['cluster_soft']] + ' ' + str(job_id)
execute_command(cmd)


def check_running_jobs_ids():
"""
Return a list of ``int`` representing job IDs of all jobs submitted by the user on a server
"""
running_jobs_ids = list()
cmd = check_status_command[servers['local']['cluster_soft']] + ' -u ' + servers['local']['un']
stdout = execute_command(cmd)[0]
for i, status_line in enumerate(stdout):
if (servers['local']['cluster_soft'].lower() == 'slurm' and i > 0)\
or (servers['local']['cluster_soft'].lower() == 'oge' and i > 1):
running_jobs_ids.append(int(status_line.split()[0]))
return running_jobs_ids


def submit_job(path):
"""
Submit a job
`path` is the job's folder path, where the submit script is located (without the submit script file name)
"""
job_status = ''
job_id = 0
cmd = 'cd ' + path + '; ' + submit_command[servers['local']['cluster_soft']] + ' '\
+ submit_filename[servers['local']['cluster_soft']]
stdout = execute_command(cmd)[0]
if 'submitted' in stdout[0].lower():
job_status = 'running'
if servers['local']['cluster_soft'].lower() == 'oge':
job_id = int(stdout[0].split()[2])
elif servers['local']['cluster_soft'].lower() == 'slurm':
job_id = int(stdout[0].split()[3])
else:
raise ValueError('Unrecognized cluster software {0}'.format(servers['local']['cluster_soft']))
return job_status, job_id


def get_last_modified_time(file_path):
"""returns the last modified time of `file_path` in a datetime format"""
try:
timestamp = os.stat(file_path).st_mtime
except (IOError, OSError):
return None
return datetime.datetime.fromtimestamp(timestamp)


def write_file(file_path, file_string):
"""
Write `file_string` as the file's content in `file_path`
"""
with open(file_path, 'w') as f:
f.write(file_string)


def rename_output(local_file_path, software):
"""
Rename the output file to "output.out" for consistency between software
`local_file_path` is the full path to the output.out file,
`software` is the software used for the job by which the original output file name is determined
"""
software = software.lower()
if os.path.isfile(os.path.join(os.path.dirname(local_file_path), output_filename[software])):
shutil.move(src=os.path.join(os.path.dirname(local_file_path), output_filename[software]), dst=local_file_path)
59 changes: 59 additions & 0 deletions arc/job/localTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
This module contains unit tests of the arc.job.local module
"""

from __future__ import (absolute_import, division, print_function, unicode_literals)
import unittest
import os
import shutil
import datetime

import arc.job.local as local
from arc.settings import arc_path

################################################################################


class TestLocal(unittest.TestCase):
"""
Contains unit tests for the local module
"""

def test_execute_command(self):
"""Test executing a local command"""
command1 = 'ls'
out1 = local.execute_command(command1)
self.assertIsInstance(out1, tuple)
self.assertIsInstance(out1[0], list)
self.assertEqual(out1[1], '')
self.assertIn('arc', out1[0])
self.assertIn('ARC.py', out1[0])
self.assertIn('environment.yml', out1[0])

def test_get_last_modified_time(self):
"""Test the get_last_modified_time() function"""
path = os.path.join(arc_path, 'ARC.py')
t = local.get_last_modified_time(path)
self.assertIsInstance(t, datetime.datetime)

def test_rename_output(self):
"""Test the rename_output() function"""
path1 = os.path.join(arc_path, 'scratch', 'input.log')
path2 = os.path.join(arc_path, 'scratch', 'output.out')
if not os.path.exists(os.path.join(arc_path, 'scratch')):
os.makedirs(os.path.join(arc_path, 'scratch'))
with open(path1, 'w'):
pass
local.rename_output(local_file_path=path2, software='gaussian')
self.assertFalse(os.path.isfile(path1))
self.assertTrue(os.path.isfile(path2))
shutil.rmtree(os.path.join(arc_path, 'scratch'))

################################################################################


if __name__ == '__main__':
unittest.main(testRunner=unittest.TextTestRunner(verbosity=2))
73 changes: 43 additions & 30 deletions arc/job/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, server=''):

def send_command_to_server(self, command, remote_path=''):
"""
Send commands to server. `command` is an array of string commands to send.
Send commands to server. `command` is either a sting or an array of string commands to send.
If remote_path is not an empty string, the command will be executed in the directory path it points to.
Returns lists of stdin, stdout, stderr corresponding to the commands sent.
"""
Expand All @@ -52,6 +52,8 @@ def send_command_to_server(self, command, remote_path=''):
ssh.connect(hostname=self.address, username=self.un)
except:
return '', 'paramiko failed to connect'
if isinstance(command, list):
command = '; '.join(command)
if remote_path != '':
# execute command in remote_path directory.
# Since each `.exec_command()` is a single session, `cd` has to be added to all commands.
Expand Down Expand Up @@ -119,8 +121,8 @@ def _download_file(self, remote_file_path, local_file_path):
try:
sftp.get(remotepath=remote_file_path, localpath=local_file_path)
except IOError:
logging.warning('Got an IOError when trying to download file {0} from {1}'.format(remote_file_path,
self.server))
logging.debug('Got an IOError when trying to download file {0} from {1}'.format(remote_file_path,
self.server))
sftp.close()
ssh.close()

Expand Down Expand Up @@ -168,27 +170,7 @@ def _check_job_status(self, job_id):
logging.info('\n\n')
logging.error('Could not check status of job {0} due to {1}'.format(job_id, stderr))
return 'connection error'
for status_line in stdout:
if str(job_id) in status_line:
break
else:
return 'done'
status = status_line.split()[4]
if status.lower() in ['r', 'qw']:
return 'running'
else:
if servers[self.server]['cluster_soft'].lower() == 'oge':
if '.cluster' in status_line:
try:
return 'errored on node ' + status_line.split()[-1].split('@')[1].split('.')[0][-2:]
except IndexError:
return 'errored'
else:
return 'errored'
elif servers[self.server]['cluster_soft'].lower() == 'slurm':
return 'errored on node ' + status_line.split()[-1][-2:]
else:
raise ValueError('Unknown server {0}'.format(self.server))
return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=self.server)

def delete_job(self, job_id):
"""
Expand All @@ -203,7 +185,7 @@ def check_running_jobs_ids(self):
"""
running_jobs_ids = list()
cmd = check_status_command[servers[self.server]['cluster_soft']] + ' -u ' + servers[self.server]['un']
stdout, _ = self.send_command_to_server(cmd)
stdout = self.send_command_to_server(cmd)[0]
for i, status_line in enumerate(stdout):
if (servers[self.server]['cluster_soft'].lower() == 'slurm' and i > 0)\
or (servers[self.server]['cluster_soft'].lower() == 'oge' and i > 1):
Expand All @@ -214,8 +196,8 @@ def submit_job(self, remote_path):
"""Submit a job"""
job_status = ''
job_id = 0
cmd = submit_command[servers[self.server]['cluster_soft']] + ' ' +\
submit_filename[servers[self.server]['cluster_soft']]
cmd = submit_command[servers[self.server]['cluster_soft']] + ' '\
+ submit_filename[servers[self.server]['cluster_soft']]
stdout, stderr = self.send_command_to_server(cmd, remote_path)
if len(stderr) > 0 or len(stdout) == 0:
job_status = 'errored'
Expand Down Expand Up @@ -265,7 +247,7 @@ def try_connecting(self):
return sftp, ssh

def get_last_modified_time(self, remote_file_path):
"""returns the last modified time of `remote_file` in a datetime format"""
"""returns the last modified time of `remote_file_path` in a datetime format"""
sftp, ssh = self.connect()
try:
timestamp = sftp.stat(remote_file_path).st_mtime
Expand All @@ -277,8 +259,12 @@ def get_last_modified_time(self, remote_file_path):


def write_file(sftp, remote_file_path, local_file_path='', file_string=''):
"""Write content into a remote file (either write content or upload)"""
with sftp.open(remote_file_path, "w") as f_remote:
"""
Write a file in `file_path`
If `file_string` is given, write it as the content of the file
Else, if `local_file_path` is given, copy it to `remote_file_path`
"""
with sftp.open(remote_file_path, 'w') as f_remote:
if file_string:
f_remote.write(file_string)
elif local_file_path:
Expand All @@ -289,4 +275,31 @@ def write_file(sftp, remote_file_path, local_file_path='', file_string=''):
raise ValueError('Could not upload file to server. Either `file_string` or `local_file_path`'
' must be specified')


def check_job_status_in_stdout(job_id, stdout, server):
"""A helper function for checking job status"""
if not isinstance(stdout, list):
stdout = stdout.splitlines()
for status_line in stdout:
if str(job_id) in status_line:
break
else:
return 'done'
status = status_line.split()[4]
if status.lower() in ['r', 'qw', 't']:
return 'running'
else:
if servers[server]['cluster_soft'].lower() == 'oge':
if '.cluster' in status_line:
try:
return 'errored on node ' + status_line.split()[-1].split('@')[1].split('.')[0][-2:]
except IndexError:
return 'errored'
else:
return 'errored'
elif servers[server]['cluster_soft'].lower() == 'slurm':
return 'errored on node ' + status_line.split()[-1][-2:]
else:
raise ValueError('Unknown cluster software {0}'.format(servers[server]['cluster_soft']))

# TODO: delete scratch files of a failed job: ssh nodeXX; rm scratch/dhdhdhd/job_number
39 changes: 39 additions & 0 deletions arc/job/sshTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
This module contains unit tests of the arc.job.ssh module
"""

from __future__ import (absolute_import, division, print_function, unicode_literals)
import unittest

import arc.job.ssh as ssh

################################################################################


class TestSSH(unittest.TestCase):
"""
Contains unit tests for the SSH module
"""

def test_check_job_status_in_stdout(self):
"""Test checking the job status in stdout"""
stdout = """job-ID prior name user state submit/start at queue slots ja-task-ID
-----------------------------------------------------------------------------------------------------------------
582682 0.45451 a9654 alongd e 04/17/2019 16:22:14 long5@node93.cluster 48
588334 0.45451 pf1005a alongd r 05/07/2019 16:24:31 long3@node67.cluster 48
588345 0.45451 a14121 alongd r 05/08/2019 02:11:42 long3@node69.cluster 48 """
status1 = ssh.check_job_status_in_stdout(job_id=588345, stdout=stdout, server='server1')
self.assertEqual(status1, 'running')
status2 = ssh.check_job_status_in_stdout(job_id=582682, stdout=stdout, server='server1')
self.assertEqual(status2, 'errored')
status3 = ssh.check_job_status_in_stdout(job_id=582600, stdout=stdout, server='server1')
self.assertEqual(status3, 'done')

################################################################################


if __name__ == '__main__':
unittest.main(testRunner=unittest.TextTestRunner(verbosity=2))

0 comments on commit 1bb2739

Please sign in to comment.