Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collections API: Update Celery to be Synchronous #2167

Merged
merged 6 commits into from
Aug 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
worker.vm.synced_folder ENV.fetch("RWD_DATA", "/tmp"), "/opt/rwd-data"

# AWS
worker.vm.synced_folder "~/.aws", "/aws"
worker.vm.synced_folder "~/.aws", "/var/lib/mmw/.aws"

# Docker
worker.vm.network "forwarded_port", {
Expand Down
2 changes: 1 addition & 1 deletion deployment/ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ docker_options: "--storage-driver=aufs"
geop_host: "localhost"
geop_port: 8090

geop_version: "3.0.0-alpha"
geop_version: "3.0.0-alpha-2"
geop_cache_enabled: 1

nginx_cache_dir: "/var/cache/nginx"
Expand Down
2 changes: 1 addition & 1 deletion deployment/ansible/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@
- src: azavea.beaver
version: 1.0.1
- src: azavea.java
version: 0.5.0
version: 0.6.1
- src: azavea.docker
version: 1.0.2
189 changes: 22 additions & 167 deletions src/mmw/apps/modeling/geoprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ast import literal_eval as make_tuple

from celery import shared_task
from celery.exceptions import MaxRetriesExceededError, Retry
from celery.exceptions import Retry

from requests.exceptions import ConnectionError

Expand All @@ -20,30 +20,27 @@


@shared_task(bind=True, default_retry_delay=1, max_retries=42)
def start(self, opname, input_data, wkaoi=None):
def run(self, opname, input_data, wkaoi=None):
"""
Start a geoproessing operation.
Run a geoprocessing operation.

Given an operation name and a dictionary of input data, looks up the
operation from the list of supported operations in settings.GEOP['json'],
combines it with input data, and submits it to Spark JobServer.

This task must always be succeeded by `finish` below.
combines it with input data, and submits it to the Geoprocessing Service.

All errors are passed along and not raised here, so that error handling can
be attached to the final task in the chain, without needing to be attached
to every task.

If a well-known area of interest id is specified in wkaoi, checks to see
if there is a cached result for that wkaoi and operation. If so, returns
that immediately in the 'cached' key. If not, starts the geoprocessing
operation while also passing along the cache key to the next step, so that
the results of geoprocessing may be cached.
that immediately. If not, starts the geoprocessing operation, and saves the
results to they key before passing them on.

:param opname: Name of operation. Must exist in settings.GEOP['json']
:param input_data: Dictionary of values to extend base operation JSON with
:param wkaoi: String id of well-known area of interest. "{table}__{id}"
:return: Dictionary containing either job_id if successful, error if not
:return: Dictionary containing either results if successful, error if not
"""
if opname not in settings.GEOP['json']:
return {
Expand All @@ -55,195 +52,53 @@ def start(self, opname, input_data, wkaoi=None):
'error': 'Input data cannot be empty'
}

outgoing = {}
key = ''

if wkaoi and settings.GEOP['cache']:
key = 'geop_{}__{}'.format(wkaoi, opname)
outgoing['key'] = key
cached = cache.get(key)
if cached:
outgoing['cached'] = cached
return outgoing
return cached

data = settings.GEOP['json'][opname].copy()
data['input'].update(input_data)

try:
outgoing['job_id'] = sjs_submit(data, self.retry)
return outgoing
except Retry as r:
raise r
except Exception as x:
return {
'error': x.message
}


@shared_task(bind=True, default_retry_delay=1, max_retries=42)
def finish(self, incoming):
"""
Retrieve results of geoprocessing.

To be used immediately after the `start` task, this takes the incoming
data and inspects it to see if there are any reported errors. If found,
the errors are passed through to the next task. Otherwise, the incoming
parameters are used to retrieve the job from Spark JobServer, and those
results are returned.

This task must always be preceeded by `start` above. The succeeding task
must take the raw JSON values and process them into information. The JSON
output will look like:

{
'List(1,2)': 3,
'List(4,5)': 6
}

where the values and number of items depend on the input.

All errors are passed along and not raised here, so that error handling can
be attached to the final task in the chain, without needing to be attached
to every task.

If the incoming set of values contains a 'cached' key, then its contents
are returned immediately. If there is a 'key' key, then the results of
geoprocessing will be saved to the cache with that key before returning.

:param incoming: Dictionary containing job_id or error
:return: Dictionary of Spark JobServer results, or error
"""
if 'error' in incoming:
return incoming

if 'cached' in incoming:
return incoming['cached']

try:
result = sjs_retrieve(incoming['job_id'], self.retry)
if 'key' in incoming:
cache.set(incoming['key'], result, None)

result = geoprocess(data, self.retry)
if key:
cache.set(key, result, None)
return result
except Retry as r:
# Celery throws a Retry exception when self.retry is called to stop
# the execution of any further code, and to indicate to the worker
# that the same task is going to be retried.
# We capture and re-raise Retry to continue this behavior, and ensure
# that it doesn't get passed to the next task like every other error.
raise r
except Exception as x:
return {
'error': x.message
}


@statsd.timer(__name__ + '.sjs_submit')
def sjs_submit(data, retry=None):
@statsd.timer(__name__ + '.geop_run')
def geoprocess(data, retry=None):
"""
Submits a job to Spark Job Server. Returns its Job ID, which
can be used with sjs_retrieve to get the final result.
Submit a request to the geoprocessing service. Returns its result.
"""
host = settings.GEOP['host']
port = settings.GEOP['port']
args = settings.GEOP['args']

base_url = 'http://{}:{}'.format(host, port)
jobs_url = '{}/jobs?{}'.format(base_url, args)

try:
response = requests.post(jobs_url, data=json.dumps(data))
except ConnectionError as exc:
if retry is not None:
retry(exc=exc)

if response.ok:
job = response.json()
else:
error = response.json()

if error['status'] == 'NO SLOTS AVAILABLE' and retry is not None:
retry(exc=Exception('No slots available in Spark JobServer.\n'
'Details = {}'.format(response.text)))
elif error['result'] == 'context geoprocessing not found':
reboot_sjs_url = '{}/contexts?reset=reboot'.format(base_url)
context_response = requests.put(reboot_sjs_url)

if context_response.ok:
if retry is not None:
retry(exc=Exception('Geoprocessing context missing in '
'Spark JobServer\nDetails = {}'.format(
context_response.text)))
else:
raise Exception('Geoprocessing context missing in '
'Spark JobServer, but no retry was set.\n'
'Details = {}'.format(
context_response.text))

else:
raise Exception('Unable to create missing geoprocessing '
'context in Spark JobServer.\n'
'Details = {}'.format(context_response.text))
else:
raise Exception('Unable to submit job to Spark JobServer.\n'
'Details = {}'.format(response.text))

if job['status'] == 'STARTED':
return job['result']['jobId']
else:
raise Exception('Submitted job did not start in Spark JobServer.\n'
'Details = {}'.format(response.text))


@statsd.timer(__name__ + '.sjs_retrieve')
def sjs_retrieve(job_id, retry=None):
"""
Given a job ID, will try to retrieve its value. If the job is
still running, will call the optional retry function before
proceeding.
"""
host = settings.GEOP['host']
port = settings.GEOP['port']
geop_url = 'http://{}:{}/run'.format(host, port)

url = 'http://{}:{}/jobs/{}'.format(host, port, job_id)
try:
response = requests.get(url)
response = requests.post(geop_url,
data=json.dumps(data),
headers={'Content-Type': 'application/json'})
except ConnectionError as exc:
if retry is not None:
retry(exc=exc)

if response.ok:
job = response.json()
return response.json()['result']
else:
raise Exception('Unable to retrieve job {} from Spark JobServer.\n'
'Details = {}'.format(job_id, response.text))

if job['status'] == 'FINISHED':
return job['result']
elif job['status'] == 'RUNNING':
if retry is not None:
try:
retry()
except MaxRetriesExceededError:
delete = requests.delete(url) # Job took too long, terminate
if delete.ok:
raise Exception('Job {} timed out, '
'deleted.'.format(job_id))
else:
raise Exception('Job {} timed out, unable to delete.\n'
'Details: {}'.format(job_id, delete.text))
else:
if job['status'] == 'ERROR':
status = 'ERROR ({}: {})'.format(job['result']['errorClass'],
job['result']['message'])
else:
status = job['status']

delete = requests.delete(url) # Job in unusual state, terminate
if delete.ok:
raise Exception('Job {} was {}, deleted'.format(job_id, status))
else:
raise Exception('Job {} was {}, could not delete.\n'
'Details = {}'.format(job_id, status, delete.text))
raise Exception('Geoprocessing Error.\n'
'Details: {}'.format(response.text))


def parse(sjs_result):
Expand Down
8 changes: 3 additions & 5 deletions src/mmw/apps/modeling/mapshed/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.conf import settings
from django.contrib.gis.geos import GEOSGeometry

from apps.modeling.geoprocessing import start, finish, parse
from apps.modeling.geoprocessing import run, parse
from apps.modeling.mapshed.calcs import (day_lengths,
nearest_weather_stations,
growing_season,
Expand Down Expand Up @@ -424,10 +424,8 @@ def geoprocessing_chains(aoi, wkaoi, exchange, errback, choose_worker):
]

return [
start.s(opname, data, wkaoi).set(exchange=exchange,
routing_key=worker) |
finish.s().set(exchange=exchange,
routing_key=worker) |
run.s(opname, data, wkaoi).set(exchange=exchange,
routing_key=worker) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we need any of the exchange and routing_key stuff now that SJS is out of the picture. The main reason for it was to pin tasks to the worker where things started so that they'd finish there too. It may be another task, but probably worth doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hectcastro We do have #2117 to address that, but it'd be helpful if you could comment on the statement I made in there about dark stack routing - that was my recollection, but I wasn't confident it was accurate.

callback.s().set(link_error=errback,
exchange=exchange,
routing_key=choose_worker())
Expand Down
25 changes: 9 additions & 16 deletions src/mmw/apps/modeling/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,10 @@ def test_tr55_job_runs_in_chain(self):
self.job.id)

# Make sure the chain is well-formed
self.assertTrue('geoprocessing.start' in str(job_chain[0]))
self.assertTrue('geoprocessing.finish' in str(job_chain[1]))
self.assertTrue('geoprocessing.run' in str(job_chain[0]))

# Modify the chain to prevent it from trying to talk to endpoint
job_chain = [get_test_histogram.s()] + job_chain[2:]
job_chain = [get_test_histogram.s()] + job_chain[1:]
task_list = chain(job_chain).apply_async()

found_job = Job.objects.get(uuid=task_list.id)
Expand Down Expand Up @@ -461,8 +460,7 @@ def test_tr55_job_error_in_chain(self):
job_chain = views._construct_tr55_job_chain(model_input,
self.job.id)

self.assertTrue('geoprocessing.start' in str(job_chain[0]))
self.assertTrue('geoprocessing.finish' in str(job_chain[1]))
self.assertTrue('geoprocessing.run' in str(job_chain[0]))

job_chain = [get_test_histogram.s()] + job_chain[2:]

Expand Down Expand Up @@ -512,8 +510,7 @@ def test_tr55_chain_doesnt_generate_censuses_if_they_exist(self):
self.job.id)

skipped_tasks = [
'start',
'finish',
'run',
'nlcd_soil_census'
]

Expand Down Expand Up @@ -550,8 +547,7 @@ def test_tr55_chain_doesnt_generate_aoi_census_if_it_exists_and_mods(self):
# Job chain is the same as if no census exists because
# we still need to generate modification censuses
needed_tasks = [
'start',
'finish',
'run',
'nlcd_soil_census',
'run_tr55'
]
Expand All @@ -576,7 +572,7 @@ def test_tr55_chain_doesnt_generate_aoi_census_if_it_exists_and_mods(self):
else False for t in needed_tasks]),
'missing necessary job in chain')

self.assertTrue(cached_argument in str(job_chain[3]))
self.assertTrue(cached_argument in str(job_chain[2]))

def test_tr55_chain_doesnt_generate_aoi_census_if_it_exists_and_no_mods(self): # noqa
"""If the AoI census exists in the model input, and there are no modifications,
Expand All @@ -597,8 +593,7 @@ def test_tr55_chain_doesnt_generate_aoi_census_if_it_exists_and_no_mods(self):
self.model_input['modification_pieces'] = []

skipped_tasks = [
'start',
'finish',
'run',
'nlcd_soil_census',
]

Expand Down Expand Up @@ -658,8 +653,7 @@ def test_tr55_chain_generates_modification_censuses_if_they_are_old(self):
skipped_tasks = []

needed_tasks = [
'start',
'finish',
'run',
'nlcd_soil_census',
'run_tr55'
]
Expand All @@ -686,8 +680,7 @@ def test_tr55_chain_generates_both_censuses_if_they_are_missing(self):
skipped_tasks = []

needed_tasks = [
'start',
'finish',
'run',
'nlcd_soil_census',
'run_tr55'
]
Expand Down
Loading