Skip to content

Commit

Permalink
Fix python2 errors in local mode
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDacre committed Feb 10, 2018
1 parent ad253cd commit 53c046c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
64 changes: 56 additions & 8 deletions fyrd/batch_systems/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
from collections import OrderedDict as _OD

try:
from queue import Empty
except ImportError: # python2
from Queue import Empty
except ImportError: # python3
from queue import Empty

from six import text_type as _txt
from six import string_types as _str
Expand All @@ -50,6 +50,8 @@

import Pyro4

from sqlalchemy.exc import InvalidRequestError

from sqlalchemy import create_engine as _create_engine
from sqlalchemy import Column as _Column
from sqlalchemy import String as _String
Expand Down Expand Up @@ -408,14 +410,37 @@ def get_server_uri(start=True):
return fin.read().strip()


RESTART_TRY = False


def get_server(start=True, raise_on_error=False):
"""Return a client-side QueueManager instance."""
uri = get_server_uri(start=start)
if not uri:
if raise_on_error:
raise QueueError('Cannot get server')
return None
return Pyro4.Proxy(uri)
server = Pyro4.Proxy(uri)
# Test for bad connection
try:
server._pyroBind()
except Pyro4.errors.CommunicationError:
global RESTART_TRY
if RESTART_TRY:
_logme.log(
"Cannot bind to server still. Failing. Try to kill the "
"process in {}".format(PID_FILE),
'critical'
)
if raise_on_error:
raise QueueError('Cannot get server')
return None
RESTART_TRY = True
_logme.log("Cannot bind to server, killing and retrying.", 'error')
kill_queue()
server = get_server(start, raise_on_error)
RESTART_TRY = False
return server


def _pid_exists(pid):
Expand Down Expand Up @@ -581,7 +606,15 @@ def submit(self, command, name, threads=1, dependencies=None,
job.errfile = stderr
if runpath:
job.runpath = runpath
session.add(job)
try:
session.add(job)
except InvalidRequestError:
# In case open in another thread
local_job = session.merge(job)
other_session = session.object_session(job)
session.add(local_job)
session.commit()
other_session.close()
session.flush()
jobno = int(job.jobno)
session.commit()
Expand Down Expand Up @@ -653,7 +686,15 @@ def clean(self, days=None):
return
session = self.db.get_session()
for job in jobs:
session.delete(job)
try:
session.delete(job)
except InvalidRequestError:
# In case open in another thread
local_job = session.merge(job)
other_session = session.object_session(job)
session.delete(local_job)
session.commit()
other_session.close()
session.commit()
session.close()

Expand Down Expand Up @@ -1100,14 +1141,16 @@ def daemonizer():


def shutdown_queue():
"""Kill the queue."""
"""Kill the server and queue gracefully."""
good = True
server = get_server(start=False)
if server:
try:
res = server.shutdown_jobs()
except OSError:
res = None
except Pyro4.errors.CommunicationError:
res = None
_logme.log('Local queue runner terminated.', 'debug')
if res is None:
_logme.log('Could not determine process completion state',
Expand All @@ -1120,6 +1163,13 @@ def shutdown_queue():
good = False
else:
_logme.log('Server appears already stopped', 'info')
kill_queue()
_logme.log('Local queue terminated', 'info')
return 0 if good else 1


def kill_queue():
"""Kill the server and queue without trying to clean jobs."""
if _os.path.isfile(PID_FILE):
with open(PID_FILE) as fin:
pid = int(fin.read().strip())
Expand All @@ -1129,8 +1179,6 @@ def shutdown_queue():
_os.kill(pid, _signal.SIGKILL)
if _os.path.isfile(URI_FILE):
_os.remove(URI_FILE)
_logme.log('Local queue terminated', 'info')
return 0 if good else 1


def daemon_manager(mode):
Expand Down
3 changes: 3 additions & 0 deletions tests/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ def test_make_job_file():
reason="No valid batch system detected")
def test_job_execution_paths():
"""Run a job and autoclean with defined paths."""
if os.path.isdir('out'):
os.removedirs('out')
os.system('rm -rf {}'.format('out'))
os.makedirs('out')
job = fyrd.Job('echo hi', profile='default', clean_files=True,
clean_outputs=True, scriptpath='..', outpath='.').submit()
Expand Down

0 comments on commit 53c046c

Please sign in to comment.