Skip to content

Commit

Permalink
Docstring fixes for lib/galaxy/jobs/runners/pulsar.py.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Mar 31, 2016
1 parent 07eb322 commit d629822
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""Job runner used to execute Galaxy jobs through Pulsar.
More infromation on Pulsar can be found at http://pulsar.readthedocs.org/.
"""
from __future__ import absolute_import # Need to import pulsar_client absolutely.

import logging
Expand Down Expand Up @@ -27,7 +31,11 @@

log = logging.getLogger( __name__ )

__all__ = [ 'PulsarLegacyJobRunner', 'PulsarRESTJobRunner', 'PulsarMQJobRunner' ]
__all__ = [
'PulsarLegacyJobRunner',
'PulsarRESTJobRunner',
'PulsarMQJobRunner',
]

NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory."
NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml."
Expand Down Expand Up @@ -133,13 +141,12 @@


class PulsarJobRunner( AsynchronousJobRunner ):
"""
Pulsar Job Runner
"""
"""Base class for pulsar job runners."""

runner_name = "PulsarJobRunner"

def __init__( self, app, nworkers, **kwds ):
"""Start the job runner """
"""Start the job runner."""
super( PulsarJobRunner, self ).__init__( app, nworkers, runner_param_specs=PULSAR_PARAM_SPECS, **kwds )
self._init_worker_threads()
galaxy_url = self.runner_params.galaxy_url
Expand All @@ -165,7 +172,7 @@ def __init_client_manager( self ):
self.client_manager = build_client_manager(**client_manager_kwargs)

def url_to_destination( self, url ):
"""Convert a legacy URL to a job destination"""
"""Convert a legacy URL to a job destination."""
return JobDestination( runner="pulsar", params=url_to_destination_params( url ) )

def check_watched_item(self, job_state):
Expand Down Expand Up @@ -243,7 +250,7 @@ def queue_job(self, job_wrapper):
self.monitor_job(pulsar_job_state)

def __prepare_job(self, job_wrapper, job_destination):
""" Build command-line and Pulsar client for this job. """
"""Build command-line and Pulsar client for this job."""
command_line = None
client = None
remote_job_config = None
Expand Down Expand Up @@ -424,9 +431,7 @@ def finish_job( self, job_state ):
job_wrapper.fail("Unable to finish job", exception=True)

def fail_job( self, job_state, message=GENERIC_REMOTE_ERROR ):
"""
Seperated out so we can use the worker threads for it.
"""
"""Seperated out so we can use the worker threads for it."""
self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) )
job_state.job_wrapper.fail( getattr( job_state, "fail_message", message ) )

Expand Down Expand Up @@ -475,7 +480,7 @@ def stop_job( self, job ):
client.kill()

def recover( self, job, job_wrapper ):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
"""Recover jobs stuck in the queued/running state when Galaxy started."""
job_state = self._job_state( job, job_wrapper )
job_wrapper.command_line = job.get_command_line()
state = job.get_state()
Expand Down Expand Up @@ -538,7 +543,9 @@ def __remote_metadata( pulsar_client ):

@staticmethod
def __use_remote_datatypes_conf( pulsar_client ):
""" When setting remote metadata, use integrated datatypes from this
"""Use remote metadata datatypes instead of Galaxy's.
When setting remote metadata, use integrated datatypes from this
Galaxy instance or use the datatypes config configured via the remote
Pulsar.
Expand Down Expand Up @@ -604,13 +611,17 @@ def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, r


class PulsarLegacyJobRunner( PulsarJobRunner ):
"""Flavor of Pulsar job runner mimicking behavior of old LWR runner."""

destination_defaults = dict(
rewrite_parameters="false",
dependency_resolution="local",
)


class PulsarMQJobRunner( PulsarJobRunner ):
"""Flavor of Pulsar job runner with sensible defaults for message queue communication."""

destination_defaults = dict(
default_file_action="remote_transfer",
rewrite_parameters="true",
Expand Down Expand Up @@ -640,6 +651,8 @@ def __async_update( self, full_status ):


class PulsarRESTJobRunner( PulsarJobRunner ):
"""Flavor of Pulsar job runner with sensible defaults for RESTful usage."""

destination_defaults = dict(
default_file_action="transfer",
rewrite_parameters="true",
Expand Down

0 comments on commit d629822

Please sign in to comment.