Skip to content

Commit

Permalink
Support for varying cache timeout, specifying pulsar runner id and do…
Browse files Browse the repository at this point in the history
…c updates
  • Loading branch information
nuwang committed Nov 13, 2018
1 parent 1ee3424 commit 2e08d70
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 19 deletions.
2 changes: 2 additions & 0 deletions docs/samples/job_conf.xml.basic
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
<param id="cloudlaunch_api_endpoint">https://launch.usegalaxy.org/cloudlaunch/api/v1</param>
<!-- Obtain your CloudLaunch token by visiting: https://launch.usegalaxy.org/profile -->
<param id="cloudlaunch_api_token">37c46c89bcbea797bc7cd76fee10932d2c6a2389</param>
<!-- id of the pulsar runner plugin. Defaults to "pulsar" -->
<param id="pulsar_runner_id">pulsar</param>
<!-- Destination to fallback to if no nodes are available -->
<param id="fallback_destination">local</param>
</destination>
Expand Down
2 changes: 2 additions & 0 deletions docs/samples/job_conf.xml.burst_if_queued
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
<param id="cloudlaunch_api_endpoint">https://launch.usegalaxy.org/cloudlaunch/api/v1</param>
<!-- Obtain your CloudLaunch token by visiting: https://launch.usegalaxy.org/profile -->
<param id="cloudlaunch_api_token">37c46c89bcbea797bc7cd76fee10932d2c6a2389</param>
<!-- id of the pulsar runner plugin. Defaults to "pulsar" -->
<param id="pulsar_runner_id">pulsar</param>
<!-- Destination to fallback to if no nodes are available -->
<param id="fallback_destination_id">local</param>
</destination>
Expand Down
2 changes: 2 additions & 0 deletions docs/samples/job_conf.xml.burst_if_size
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
<param id="cloudlaunch_api_endpoint">https://launch.usegalaxy.org/cloudlaunch/api/v1</param>
<!-- Obtain your CloudLaunch token by visiting: https://launch.usegalaxy.org/profile -->
<param id="cloudlaunch_api_token">37c46c89bcbea797bc7cd76fee10932d2c6a2389</param>
<!-- id of the pulsar runner plugin. Defaults to "pulsar" -->
<param id="pulsar_runner_id">pulsar</param>
<!-- Destination to fallback to if no nodes are available -->
<param id="fallback_destination_id">local</param>
</destination>
Expand Down
46 changes: 36 additions & 10 deletions docs/topics/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Configuring Galaxy
`<galaxy_home>/lib/galaxy/jobs/rules/`.

Create a file named galaxycloudrunner.py and paste the following contents
into the file at the location above .
into the file at the location above.

.. literalinclude:: ../../galaxycloudrunner/rules/cloudlaunch_pulsar.py
:language: python
Expand All @@ -24,19 +24,22 @@ Configuring Galaxy
highlighted sections to it.

You will need to add your own ``cloudlaunch_api_token`` to the file.
Instructions on how to obtain your CloudLaunch API key is given below.
Instructions on how to obtain your CloudLaunch API key are given below.
If you have a Galaxy version prior to 19.01, the line
`<param id="rules_module">galaxycloudrunner.rules</param>` passed to your
destination will not work. This is the reason that we need to perform step 2.

.. literalinclude:: ../samples/job_conf.xml.basic
:language: xml
:linenos:
:emphasize-lines: 7,9-18
:emphasize-lines: 7,9-20

4. Launch as many Pulsar nodes as you need through `CloudLaunch`_.
4. Launch as many Pulsar nodes as you need through `CloudLaunch`_. The job rule
will periodically query CloudLaunch, discover these new nodes, and route jobs
to them.
Instructions on how to launch new Pulsar nodes are below.


5. Submit your jobs as usual.


Expand Down Expand Up @@ -108,7 +111,8 @@ default. This works as follows:
the GalaxyCloudRunner. This has implications for node addition and in
particular removal. When adding a node, there could be a delay of a few
minutes before the node is picked up. If a Pulsar node is removed, your jobs
may be routed to a dead node for the duration of the caching period.
may be routed to a dead node for the duration of the caching period. See
:ref:`additional-configuration` on how to change this cache period.
4. If no node is available, it will return the ``fallback_destination_id``, if
specified, in which case the job will be routed there. If no
``fallback_destination_id`` is specified, the job will be re-queued till a node
Expand All @@ -129,16 +133,16 @@ a configuration like the following.
:linenos:
:emphasize-lines: 8,10-16

Note the emphasized lines. In this example, we route to the built-in rule,
Note the emphasized lines. In this example, we route to the built-in rule
``burst`` first, which determines whether or not the cloud bursting
should occur. It examines how many jobs in the
``from_destinations`` are in the given state (``queued`` in this case),
and if they are above ``num_jobs``, routes to the
``galaxy_cloud_runner`` destination. If bursting should not occur, it routes
the first destination in the ``from_destinations`` list. This provides a simple
method to scale to Pulsar nodes only if a desired queue has a backlog of jobs.
You may need to experiment with these values to find ones that work best
for your requirements.
to the first destination in the ``from_destinations`` list. This provides a
simple method to scale to Pulsar nodes only if a desired queue has a backlog
of jobs. You may need to experiment with these values to find ones that work
best for your requirements.

Advanced bursting
-----------------
Expand All @@ -157,5 +161,27 @@ destination, which will check the total size of the input files. If they are
less than 1GB, they are routed to the GalaxyCloudRunner. If not, they are
routed to a local queue.

.. _additional-configuration:

Additional Configuration and Limitations
----------------------------------------

1. Configuring the query timeout
You can set the environment variable ``CLOUDLAUNCH_QUERY_CACHE_PERIOD`` before
starting Galaxy to control the caching period. Setting this to 0 will allow you
to get around the node removal issue (If a Pulsar node is removed, your jobs
may be routed to a dead node for the duration of the caching period.), but we
recommend setting a value to avoid repeatedly querying a remote server during
each job submission.

2. Auto-scaling
Currently, the GalaxyCloudRunner does not support automatic scaling, you must
manually add and remove nodes. We will be adding autoscaling features as
part of CloudMan v2.0 in future.

3. Galaxy versions prior to 19.01
Galaxy versions prior to 19.01 do not support certain features required by
GalaxyCloudRunner and therefore, need more complex configuration steps.

.. _https://launch.usegalaxy.org/: https://launch.usegalaxy.org/
.. _CloudLaunch: https://launch.usegalaxy.org/
5 changes: 4 additions & 1 deletion galaxycloudrunner/rules/cloudlaunch_pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

def cloudlaunch_pulsar_burst(app, cloudlaunch_api_endpoint=None,
cloudlaunch_api_token=None,
pulsar_runner_id="pulsar",
fallback_destination_id=None):
return get_destination(app, cloudlaunch_api_endpoint,
cloudlaunch_api_token, fallback_destination_id)
cloudlaunch_api_token,
pulsar_runner_id,
fallback_destination_id)
16 changes: 10 additions & 6 deletions galaxycloudrunner/rules/helper_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ def __sum_total(prev, current):
return prev + current


def __calculate_dataset_total(datasets):
if datasets:
return reduce(__sum_total,
map(__get_dataset_size, datasets))
else:
return 0


def to_destination_if_size(job, max_size, to_destination_id,
fallback_destination_id):
"""
A rule that will route a job to the "to_destination_id" if
the input size is below a certain threshold, or to the
"fallback_destination_id" if not.
"""
total_input_size = (reduce(
__sum_total, map(__get_dataset_size, job.input_datasets))
if job.input_datasets else 0)
total_library_size = (reduce(
__sum_total, map(__get_dataset_size, job.input_library_datasets))
if job.input_library_datasets else 0)
total_input_size = __calculate_dataset_total(job.input_datasets)
total_library_size = __calculate_dataset_total(job.input_library_datasets)
if (total_input_size+total_library_size) <= size_to_bytes(max_size):
return to_destination_id
else:
Expand Down
8 changes: 6 additions & 2 deletions galaxycloudrunner/runners/cl_pulsar_burst.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from galaxy.jobs import JobDestination
from galaxy.jobs.mapper import JobNotReadyException


CACHE_TIMEOUT = os.environ.get('CLOUDLAUNCH_QUERY_CACHE_PERIOD', 300)

# Global variable for tracking round-robin index
current_server_index = 0

Expand All @@ -20,7 +23,7 @@ def _get_cloudlaunch_client(cloudlaunch_api_endpoint, cloudlaunch_api_token):
return APIClient(cloudlaunch_url, token=cloudlaunch_token)


@cachetools.cached(cachetools.TTLCache(maxsize=1, ttl=300))
@cachetools.cached(cachetools.TTLCache(maxsize=1, ttl=CACHE_TIMEOUT))
def _get_pulsar_servers(cloudlaunch_client):
"""
Return an array of tuples, consisting of the Pulsar url and auth token.
Expand Down Expand Up @@ -64,6 +67,7 @@ def get_next_server(cloudlaunch_api_endpoint, cloudlaunch_api_token):

def get_destination(app, cloudlaunch_api_endpoint=None,
cloudlaunch_api_token=None,
pulsar_runner_id="pulsar",
fallback_destination_id=None):
"""
Returns an available Pulsar JobDestination by querying
Expand All @@ -75,7 +79,7 @@ def get_destination(app, cloudlaunch_api_endpoint=None,
url, token = get_next_server(cloudlaunch_api_endpoint,
cloudlaunch_api_token)
if url:
return JobDestination(runner="pulsar",
return JobDestination(runner=pulsar_runner_id,
params={"url": url,
"private_token": token})
elif fallback_destination_id:
Expand Down

0 comments on commit 2e08d70

Please sign in to comment.