Skip to content

Commit

Permalink
Revise job recovery behavior.
Browse files Browse the repository at this point in the history
 - Fix bug where jobs may have been recovered before the MQ callback was set on the manager.
 - Add a new LOST job state for jobs that cannot be recovered and the Pulsar is going to abandon.
 - Fire off callback for LOST jobs.
 - Add test case to verify callback is called for LOST jobs.
  • Loading branch information
jmchilton committed Apr 10, 2015
1 parent f1a9e83 commit 61ed774
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 22 deletions.
5 changes: 5 additions & 0 deletions pulsar/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self, **conf):
self.__setup_managers(conf)
self.__setup_file_cache(conf)
self.__setup_bind_to_message_queue(conf)
self.__recover_jobs()
self.ensure_cleanup = conf.get("ensure_cleanup", False)

def shutdown(self, timeout=None):
Expand Down Expand Up @@ -86,6 +87,10 @@ def __setup_staging_directory(self, staging_directory):
def __setup_managers(self, conf):
self.managers = build_managers(self, conf)

def __recover_jobs(self):
for manager in self.managers.values():
manager.recover_active_jobs()

def __setup_private_token(self, private_token):
self.private_token = private_token
if private_token:
Expand Down
7 changes: 5 additions & 2 deletions pulsar/managers/base/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

DEFAULT_JOB_NAME_TEMPLATE = "pulsar_$job_id"
JOB_FILE_EXTERNAL_ID = "external_id"
FAILED_TO_LOAD_EXTERNAL_ID = object()

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,9 +62,11 @@ def _job_name(self, job_id):
return Template(self.job_name_template).safe_substitute(env)

def _recover_active_job(self, job_id):
external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID)
if external_id:
external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID, FAILED_TO_LOAD_EXTERNAL_ID)
if external_id and external_id is not FAILED_TO_LOAD_EXTERNAL_ID:
self._external_ids[job_id] = external_id
else:
raise Exception("Could not determine external ID for job_id [%s]" % job_id)

def _deactivate_job(self, job_id):
del self._external_ids[job_id]
25 changes: 19 additions & 6 deletions pulsar/managers/stateful.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ def __init__(self, manager, **manager_options):
self.__preprocess_action_executor = RetryActionExecutor(**preprocess_retry_action_kwds)
self.__postprocess_action_executor = RetryActionExecutor(**postprocess_retry_action_kwds)
self.min_polling_interval = datetime.timedelta(0, min_polling_interval)
self.active_jobs = ActiveJobs(manager)
self.__state_change_callback = lambda status, job_id: None
self.__recover_active_jobs()
self.active_jobs = ActiveJobs.from_manager(manager)
self.__state_change_callback = self._default_status_change_callback
self.__monitor = None

def set_state_change_callback(self, state_change_callback):
self.__state_change_callback = state_change_callback
self.__monitor = ManagerMonitor(self)

def _default_status_change_callback(self, status, job_id):
log.info("Status of job [%s] changed to [%s]. No callbacks enabled." % (status, job_id))

@property
def name(self):
return self._proxied_manager.name
Expand Down Expand Up @@ -163,7 +165,7 @@ def shutdown(self, timeout=None):
log.exception("Failed to shutdown job monitor for manager %s" % self.name)
super(StatefulManagerProxy, self).shutdown(timeout)

def __recover_active_jobs(self):
def recover_active_jobs(self):
recover_method = getattr(self._proxied_manager, "_recover_active_job", None)
if recover_method is None:
return
Expand All @@ -173,6 +175,12 @@ def __recover_active_jobs(self):
recover_method(job_id)
except Exception:
log.exception("Failed to recover active job %s" % job_id)
self.__handle_recovery_problem(job_id)

def __handle_recovery_problem(self, job_id):
# Make sure we tell the client we have lost this job.
self.active_jobs.deactivate_job(job_id)
self.__state_change_callback(status.LOST, job_id)


class ActiveJobs(object):
Expand All @@ -184,10 +192,15 @@ class ActiveJobs(object):
hit disk to recover this information.
"""

def __init__(self, manager):
@staticmethod
def from_manager(manager):
persistence_directory = manager.persistence_directory
manager_name = manager.name
return ActiveJobs(manager_name, persistence_directory)

def __init__(self, manager_name, persistence_directory):
if persistence_directory:
active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager.name)
active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager_name)
if not os.path.exists(active_job_directory):
os.makedirs(active_job_directory)
else:
Expand Down
13 changes: 12 additions & 1 deletion pulsar/managers/status.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
# TODO: Make objects.

# Job is staging about will be queued shortly.
PREPROCESSING = "preprocessing"
# Job manager has queued this job for execution.
QUEUED = "queued"
# Job manager believes the job is currently running.
RUNNING = "running"
# Job manager has finished and postprocessing ran successfully.
COMPLETE = "complete"
# Job was cancelled
CANCELLED = "cancelled"
# Problem submitting the job, interfacing with the job manager,
# or postprocessing the job.
FAILED = "failed"
# DRM marked job as complete and job is being unstaged.
POSTPROCESSING = "postprocessing"
# Pulsar believed this job to be active but the job manager
# cannot determine a state for it.
LOST = "lost"


def is_job_done(status):
""" Does the supplied status correspond to a finished
job (done processing).
"""
return status in [COMPLETE, CANCELLED, FAILED]
return status in [COMPLETE, CANCELLED, FAILED, LOST]
76 changes: 63 additions & 13 deletions test/integration_test_restart.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import contextlib
import threading
import time

from .test_utils import (
TempDirectoryTestCase,
skip_unless_module,
Expand All @@ -7,20 +10,17 @@
from pulsar.manager_endpoint_util import (
submit_job,
)
from pulsar.managers.stateful import ActiveJobs
from pulsar.client.amqp_exchange_factory import get_exchange
from pulsar.managers.util.drmaa import DrmaaSessionFactory
import time


class RestartTestCase(TempDirectoryTestCase):

@skip_unless_module("drmaa")
@skip_unless_module("kombu")
def test_restart_finishes_job(self):
mq_url = "memory://test1092"
app_conf = dict(message_queue_url=mq_url)
app_conf["managers"] = {"manager_restart": {'type': 'queued_drmaa'}}
with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider:
with self._setup_app_provider("restart_and_finish") as app_provider:
job_id = '12345'

with app_provider.new_app() as app:
Expand All @@ -31,25 +31,71 @@ def test_restart_finishes_job(self):
'setup': True,
}
submit_job(manager, job_info)
# TODO: unfortunate breaking of abstractions here.
time.sleep(.2)
external_id = manager._proxied_manager._external_id(job_id)
external_id = None
for i in range(10):
time.sleep(.05)
# TODO: unfortunate breaking of abstractions here.
external_id = manager._proxied_manager._external_id(job_id)
if external_id:
break
if external_id is None:
assert False, "Test failed, couldn't get exteranl id for job id."

drmaa_session = DrmaaSessionFactory().get()
drmaa_session.kill(external_id)
drmaa_session.close()
time.sleep(.2)

consumer = SimpleConsumer(queue="status_update", url=mq_url, manager="manager_restart")
consumer = self._status_update_consumer("restart_and_finish")
consumer.start()

with app_provider.new_app() as app:
time.sleep(.3)
consumer.wait_for_messages()

consumer.join()
assert len(consumer.messages) == 1, len(consumer.messages)
assert consumer.messages[0]["status"] == "complete"

@skip_unless_module("drmaa")
@skip_unless_module("kombu")
def test_recovery_failure_fires_lost_status(self):
test = "restart_and_finish"
with self._setup_app_provider(test) as app_provider:
job_id = '12345'

with app_provider.new_app() as app:
persistence_directory = app.persistence_directory

# Break some abstractions to activate a job that
# never existed.
manager_name = "manager_%s" % test
active_jobs = ActiveJobs(manager_name, persistence_directory)
active_jobs.activate_job(job_id)

consumer = self._status_update_consumer(test)
consumer.start()

with app_provider.new_app() as app:
consumer.wait_for_messages()

consumer.join()

assert len(consumer.messages) == 1, len(consumer.messages)
assert consumer.messages[0]["status"] == "lost"

@contextlib.contextmanager
def _setup_app_provider(self, test):
mq_url = "memory://test_%s" % test
manager = "manager_%s" % test
app_conf = dict(message_queue_url=mq_url)
app_conf["managers"] = {manager: {'type': 'queued_drmaa'}}
with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider:
yield app_provider

def _status_update_consumer(self, test):
mq_url = "memory://test_%s" % test
manager = "manager_%s" % test
consumer = SimpleConsumer(queue="status_update", url=mq_url, manager=manager)
return consumer


class SimpleConsumer(object):

Expand All @@ -58,7 +104,7 @@ def __init__(self, queue, url, manager="_default_"):
self.url = url
self.manager = manager
self.active = True
self.exchange = get_exchange("memory://test1092", manager, {})
self.exchange = get_exchange(url, manager, {})

self.messages = []

Expand All @@ -71,6 +117,10 @@ def join(self):
self.active = False
self.thread.join(10)

def wait_for_messages(self, n=1):
while len(self.messages) < n:
time.sleep(.05)

def _run(self):
self.exchange.consume("status_update", self._callback, check=self)

Expand Down
1 change: 1 addition & 0 deletions test/persistence_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_persistence():
assert (not(exists(touch_file)))
queue1.shutdown()
queue2 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=1))
queue2.recover_active_jobs()
time.sleep(1)
assert exists(touch_file)
finally:
Expand Down

0 comments on commit 61ed774

Please sign in to comment.