diff --git a/docs/samples/job_conf.xml.basic b/docs/samples/job_conf.xml.basic index cac07b0..36bcf08 100644 --- a/docs/samples/job_conf.xml.basic +++ b/docs/samples/job_conf.xml.basic @@ -13,6 +13,8 @@ https://launch.usegalaxy.org/cloudlaunch/api/v1 37c46c89bcbea797bc7cd76fee10932d2c6a2389 + + pulsar local diff --git a/docs/samples/job_conf.xml.burst_if_queued b/docs/samples/job_conf.xml.burst_if_queued index df65dd4..ea422b8 100644 --- a/docs/samples/job_conf.xml.burst_if_queued +++ b/docs/samples/job_conf.xml.burst_if_queued @@ -21,6 +21,8 @@ https://launch.usegalaxy.org/cloudlaunch/api/v1 37c46c89bcbea797bc7cd76fee10932d2c6a2389 + + pulsar local diff --git a/docs/samples/job_conf.xml.burst_if_size b/docs/samples/job_conf.xml.burst_if_size index 8956cfd..910ad09 100644 --- a/docs/samples/job_conf.xml.burst_if_size +++ b/docs/samples/job_conf.xml.burst_if_size @@ -29,6 +29,8 @@ https://launch.usegalaxy.org/cloudlaunch/api/v1 37c46c89bcbea797bc7cd76fee10932d2c6a2389 + + pulsar local diff --git a/docs/topics/overview.rst b/docs/topics/overview.rst index 4e31d2c..d28b4bb 100644 --- a/docs/topics/overview.rst +++ b/docs/topics/overview.rst @@ -14,7 +14,7 @@ Configuring Galaxy `/lib/galaxy/jobs/rules/`. Create a file named galaxycloudrunner.py and paste the following contents - into the file at the location above . + into the file at the location above. .. literalinclude:: ../../galaxycloudrunner/rules/cloudlaunch_pulsar.py :language: python @@ -24,7 +24,7 @@ Configuring Galaxy highlighted sections to it. You will need to add your own ``cloudlaunch_api_token`` to the file. - Instructions on how to obtain your CloudLaunch API key is given below. + Instructions on how to obtain your CloudLaunch API key are given below. If you have a Galaxy version prior to 19.01, the line `galaxycloudrunner.rules` passed to your destination will not work. This is the reason that we need to perform step 2. @@ -32,11 +32,14 @@ Configuring Galaxy .. literalinclude:: ../samples/job_conf.xml.basic :language: xml :linenos: - :emphasize-lines: 7,9-18 + :emphasize-lines: 7,9-20 -4. Launch as many Pulsar nodes as you need through `CloudLaunch`_. +4. Launch as many Pulsar nodes as you need through `CloudLaunch`_. The job rule + will periodically query CloudLaunch, discover these new nodes, and route jobs + to them. Instructions on how to launch new Pulsar nodes are below. + 5. Submit your jobs as usual. @@ -108,7 +111,8 @@ default. This works as follows: the GalaxyCloudRunner. This has implications for node addition and in particular removal. When adding a node, there could be a delay of a few minutes before the node is picked up. If a Pulsar node is removed, your jobs - may be routed to a dead node for the duration of the caching period. + may be routed to a dead node for the duration of the caching period. See + :ref:`additional-configuration` on how to change this cache period. 4. If no node is available, it will return the ``fallback_destination_id``, if specified, in which case the job will be routed there. If no ``fallback_destination_id`` is specified, the job will be re-queued till a node @@ -129,16 +133,16 @@ a configuration like the following. :linenos: :emphasize-lines: 8,10-16 -Note the emphasized lines. In this example, we route to the built-in rule, +Note the emphasized lines. In this example, we route to the built-in rule ``burst`` first, which determines whether or not the cloud bursting should occur. It examines how many jobs in the ``from_destinations`` are in the given state (``queued`` in this case), and if they are above ``num_jobs``, routes to the ``galaxy_cloud_runner`` destination. If bursting should not occur, it routes -the first destination in the ``from_destinations`` list. This provides a simple -method to scale to Pulsar nodes only if a desired queue has a backlog of jobs. -You may need to experiment with these values to find ones that work best -for your requirements. +to the first destination in the ``from_destinations`` list. This provides a +simple method to scale to Pulsar nodes only if a desired queue has a backlog +of jobs. You may need to experiment with these values to find ones that work +best for your requirements. Advanced bursting ----------------- @@ -157,5 +161,27 @@ destination, which will check the total size of the input files. If they are less than 1GB, they are routed to the GalaxyCloudRunner. If not, they are routed to a local queue. +.. _additional-configuration: + +Additional Configuration and Limitations +---------------------------------------- + +1. Configuring the query timeout + You can set the environment variable ``CLOUDLAUNCH_QUERY_CACHE_PERIOD`` before + starting Galaxy to control the caching period. Setting this to 0 will allow you + to get around the node removal issue (If a Pulsar node is removed, your jobs + may be routed to a dead node for the duration of the caching period.), but we + recommend setting a value to avoid repeatedly querying a remote server during + each job submission. + +2. Auto-scaling + Currently, the GalaxyCloudRunner does not support automatic scaling, you must + manually add and remove nodes. We will be adding autoscaling features as + part of CloudMan v2.0 in future. + +3. Galaxy versions prior to 19.01 + Galaxy versions prior to 19.01 do not support certain features required by + GalaxyCloudRunner and therefore, need more complex configuration steps. + .. _https://launch.usegalaxy.org/: https://launch.usegalaxy.org/ .. _CloudLaunch: https://launch.usegalaxy.org/ \ No newline at end of file diff --git a/galaxycloudrunner/rules/cloudlaunch_pulsar.py b/galaxycloudrunner/rules/cloudlaunch_pulsar.py index 0fb5efe..395b4f9 100644 --- a/galaxycloudrunner/rules/cloudlaunch_pulsar.py +++ b/galaxycloudrunner/rules/cloudlaunch_pulsar.py @@ -3,6 +3,9 @@ def cloudlaunch_pulsar_burst(app, cloudlaunch_api_endpoint=None, cloudlaunch_api_token=None, + pulsar_runner_id="pulsar", fallback_destination_id=None): return get_destination(app, cloudlaunch_api_endpoint, - cloudlaunch_api_token, fallback_destination_id) + cloudlaunch_api_token, + pulsar_runner_id, + fallback_destination_id) diff --git a/galaxycloudrunner/rules/helper_rules.py b/galaxycloudrunner/rules/helper_rules.py index 50682e2..9d393af 100644 --- a/galaxycloudrunner/rules/helper_rules.py +++ b/galaxycloudrunner/rules/helper_rules.py @@ -14,6 +14,14 @@ def __sum_total(prev, current): return prev + current +def __calculate_dataset_total(datasets): + if datasets: + return reduce(__sum_total, + map(__get_dataset_size, datasets)) + else: + return 0 + + def to_destination_if_size(job, max_size, to_destination_id, fallback_destination_id): """ @@ -21,12 +29,8 @@ def to_destination_if_size(job, max_size, to_destination_id, the input size is below a certain threshold, or to the "fallback_destination_id" if not. """ - total_input_size = (reduce( - __sum_total, map(__get_dataset_size, job.input_datasets)) - if job.input_datasets else 0) - total_library_size = (reduce( - __sum_total, map(__get_dataset_size, job.input_library_datasets)) - if job.input_library_datasets else 0) + total_input_size = __calculate_dataset_total(job.input_datasets) + total_library_size = __calculate_dataset_total(job.input_library_datasets) if (total_input_size+total_library_size) <= size_to_bytes(max_size): return to_destination_id else: diff --git a/galaxycloudrunner/runners/cl_pulsar_burst.py b/galaxycloudrunner/runners/cl_pulsar_burst.py index 146f107..be77e51 100644 --- a/galaxycloudrunner/runners/cl_pulsar_burst.py +++ b/galaxycloudrunner/runners/cl_pulsar_burst.py @@ -6,6 +6,9 @@ from galaxy.jobs import JobDestination from galaxy.jobs.mapper import JobNotReadyException + +CACHE_TIMEOUT = os.environ.get('CLOUDLAUNCH_QUERY_CACHE_PERIOD', 300) + # Global variable for tracking round-robin index current_server_index = 0 @@ -20,7 +23,7 @@ def _get_cloudlaunch_client(cloudlaunch_api_endpoint, cloudlaunch_api_token): return APIClient(cloudlaunch_url, token=cloudlaunch_token) -@cachetools.cached(cachetools.TTLCache(maxsize=1, ttl=300)) +@cachetools.cached(cachetools.TTLCache(maxsize=1, ttl=CACHE_TIMEOUT)) def _get_pulsar_servers(cloudlaunch_client): """ Return an array of tuples, consisting of the Pulsar url and auth token. @@ -64,6 +67,7 @@ def get_next_server(cloudlaunch_api_endpoint, cloudlaunch_api_token): def get_destination(app, cloudlaunch_api_endpoint=None, cloudlaunch_api_token=None, + pulsar_runner_id="pulsar", fallback_destination_id=None): """ Returns an available Pulsar JobDestination by querying @@ -75,7 +79,7 @@ def get_destination(app, cloudlaunch_api_endpoint=None, url, token = get_next_server(cloudlaunch_api_endpoint, cloudlaunch_api_token) if url: - return JobDestination(runner="pulsar", + return JobDestination(runner=pulsar_runner_id, params={"url": url, "private_token": token}) elif fallback_destination_id: