Skip to content

Commit

Permalink
Merge pull request #2193 from WikiWatershed/mjm/celery-crunch
Browse files Browse the repository at this point in the history
Remove custom celery routing and exchange
  • Loading branch information
Matthew McFarland committed Aug 25, 2017
2 parents b06a9e9 + 9ab6d71 commit 8577f11
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 123 deletions.
10 changes: 3 additions & 7 deletions src/mmw/apps/modeling/mapshed/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,7 @@ def nlcd_kfactor(result):
return output


def geoprocessing_chains(aoi, wkaoi, exchange, errback, choose_worker):
worker = choose_worker()
def geoprocessing_chains(aoi, wkaoi, errback):
task_defs = [
('nlcd_soils', nlcd_soils, {'polygon': [aoi]}),
('gwn', gwn, {'polygon': [aoi]}),
Expand All @@ -424,11 +423,8 @@ def geoprocessing_chains(aoi, wkaoi, exchange, errback, choose_worker):
]

return [
run.s(opname, data, wkaoi).set(exchange=exchange,
routing_key=worker) |
callback.s().set(link_error=errback,
exchange=exchange,
routing_key=choose_worker())
run.s(opname, data, wkaoi) |
callback.s().set(link_error=errback)
for (opname, callback, data) in task_defs
]

Expand Down
132 changes: 26 additions & 106 deletions src/mmw/apps/modeling/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@
from __future__ import unicode_literals

import json
import random

import celery
from celery import chain, group

from retry import retry

from rest_framework.response import Response
from rest_framework import decorators, status
from rest_framework.permissions import (AllowAny,
Expand Down Expand Up @@ -39,11 +35,6 @@
ScenarioSerializer)


# When CELERY_WORKER_DIRECT = True, this exchange is automatically
# created to allow direct communication with workers.
MAGIC_EXCHANGE = 'C.dq'


@decorators.api_view(['GET', 'POST'])
@decorators.permission_classes((IsAuthenticated, ))
def projects(request):
Expand Down Expand Up @@ -225,11 +216,9 @@ def start_gwlfe(request, format=None):

def _initiate_gwlfe_job_chain(model_input, inputmod_hash, job_id):
chain = (tasks.run_gwlfe.s(model_input, inputmod_hash)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()) |
save_job_result.s(job_id, model_input)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()))
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())
| save_job_result.s(job_id, model_input))

errback = save_job_error.s(job_id)

return chain.apply_async(link_error=errback)

Expand Down Expand Up @@ -261,26 +250,15 @@ def start_mapshed(request, format=None):


def _initiate_mapshed_job_chain(mapshed_input, job_id):
workers = get_living_workers()
get_worker = lambda: random.choice(workers)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=get_worker())
errback = save_job_error.s(job_id)

area_of_interest, wkaoi = parse_input(mapshed_input)

job_chain = (
group(geoprocessing_chains(area_of_interest, wkaoi,
MAGIC_EXCHANGE, errback, choose_worker)) |
combine.s().set(
exchange=MAGIC_EXCHANGE,
routing_key=get_worker()) |
collect_data.s(area_of_interest).set(
link_error=errback,
exchange=MAGIC_EXCHANGE,
routing_key=get_worker()) |
save_job_result.s(job_id, mapshed_input).set(
exchange=MAGIC_EXCHANGE,
routing_key=get_worker()))
group(geoprocessing_chains(area_of_interest, wkaoi, errback)) |
combine.s() |
collect_data.s(area_of_interest).set(link_error=errback) |
save_job_result.s(job_id, mapshed_input))

return chain(job_chain).apply_async(link_error=errback)

Expand All @@ -307,37 +285,29 @@ def export_gms(request, format=None):
@decorators.permission_classes((AllowAny, ))
def start_analyze_land(request, format=None):
user = request.user if request.user.is_authenticated() else None
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()

area_of_interest, wkaoi = parse_input(request.POST['analyze_input'])

geop_input = {'polygon': [area_of_interest]}

return start_celery_job([
geoprocessing.run.s('nlcd', geop_input, wkaoi)
.set(exchange=exchange, routing_key=routing_key),
geoprocessing.run.s('nlcd', geop_input, wkaoi),
tasks.analyze_nlcd.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


@decorators.api_view(['POST'])
@decorators.permission_classes((AllowAny, ))
def start_analyze_soil(request, format=None):
user = request.user if request.user.is_authenticated() else None
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()

area_of_interest, wkaoi = parse_input(request.POST['analyze_input'])

geop_input = {'polygon': [area_of_interest]}

return start_celery_job([
geoprocessing.run.s('soil', geop_input, wkaoi)
.set(exchange=exchange, routing_key=routing_key),
geoprocessing.run.s('soil', geop_input, wkaoi),
tasks.analyze_soil.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand All @@ -346,11 +316,9 @@ def start_analyze_soil(request, format=None):
def start_analyze_animals(request, format=None):
user = request.user if request.user.is_authenticated() else None
area_of_interest, __ = parse_input(request.POST['analyze_input'])
exchange = MAGIC_EXCHANGE

return start_celery_job([
tasks.analyze_animals.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand All @@ -359,11 +327,9 @@ def start_analyze_animals(request, format=None):
def start_analyze_pointsource(request, format=None):
user = request.user if request.user.is_authenticated() else None
area_of_interest, __ = parse_input(request.POST['analyze_input'])
exchange = MAGIC_EXCHANGE

return start_celery_job([
tasks.analyze_pointsource.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand All @@ -372,11 +338,9 @@ def start_analyze_pointsource(request, format=None):
def start_analyze_catchment_water_quality(request, format=None):
user = request.user if request.user.is_authenticated() else None
area_of_interest, __ = parse_input(request.POST['analyze_input'])
exchange = MAGIC_EXCHANGE

return start_celery_job([
tasks.analyze_catchment_water_quality.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand Down Expand Up @@ -411,39 +375,12 @@ def get_job(request, job_uuid, format=None):
)


def get_living_workers():
def predicate(worker_name):
return settings.STACK_COLOR in worker_name or 'debug' in worker_name

@retry(Exception, delay=0.5, backoff=2, tries=3)
def get_list_of_workers():
workers = celery.current_app.control.inspect().ping()

if workers is None:
raise Exception('Unable to receive a PONG from any workers')

return workers.keys()

workers = filter(predicate,
get_list_of_workers())
return workers


def choose_worker():
return random.choice(get_living_workers())


def _initiate_rwd_job_chain(location, snapping, data_source,
job_id, testing=False):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(tasks.start_rwd_job.s(location, snapping, data_source)
.set(exchange=exchange, routing_key=routing_key),
save_job_result.s(job_id, location)
.set(exchange=exchange, routing_key=choose_worker())) \
errback = save_job_error.s(job_id)

return chain(tasks.start_rwd_job.s(location, snapping, data_source),
save_job_result.s(job_id, location)) \
.apply_async(link_error=errback)


Expand All @@ -468,15 +405,12 @@ def start_tr55(request, format=None):

def _initiate_tr55_job_chain(model_input, job_id):
job_chain = _construct_tr55_job_chain(model_input, job_id)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())
errback = save_job_error.s(job_id)

return chain(job_chain).apply_async(link_error=errback)


def _construct_tr55_job_chain(model_input, job_id):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()

job_chain = []

Expand All @@ -500,23 +434,18 @@ def _construct_tr55_job_chain(model_input, job_id):
census_hash == current_hash) or not pieces)):
censuses = [aoi_census] + modification_census_items

job_chain.append(tasks.run_tr55.s(censuses, aoi, model_input)
.set(exchange=exchange, routing_key=choose_worker()))
job_chain.append(tasks.run_tr55.s(censuses, aoi, model_input))
else:
job_chain.append(tasks.nlcd_soil_census.s()
.set(exchange=exchange, routing_key=choose_worker()))
job_chain.append(tasks.nlcd_soil_census.s())

if aoi_census and pieces:
polygons = [m['shape']['geometry'] for m in pieces]
geop_input = {'polygon': [json.dumps(p) for p in polygons]}

job_chain.insert(0, geoprocessing.run.s('nlcd_soil_census',
geop_input)
.set(exchange=exchange, routing_key=routing_key))
geop_input))
job_chain.append(tasks.run_tr55.s(aoi, model_input,
cached_aoi_census=aoi_census)
.set(exchange=exchange,
routing_key=choose_worker()))
cached_aoi_census=aoi_census))
else:
polygons = [aoi] + [m['shape']['geometry'] for m in pieces]
geop_input = {'polygon': [json.dumps(p) for p in polygons]}
Expand All @@ -525,14 +454,10 @@ def _construct_tr55_job_chain(model_input, job_id):

job_chain.insert(0, geoprocessing.run.s('nlcd_soil_census',
geop_input,
wkaoi)
.set(exchange=exchange, routing_key=routing_key))
job_chain.append(tasks.run_tr55.s(aoi, model_input)
.set(exchange=exchange,
routing_key=choose_worker()))
wkaoi))
job_chain.append(tasks.run_tr55.s(aoi, model_input))

job_chain.append(save_job_result.s(job_id, model_input)
.set(exchange=exchange, routing_key=choose_worker()))
job_chain.append(save_job_result.s(job_id, model_input))

return job_chain

Expand Down Expand Up @@ -687,8 +612,7 @@ def drb_point_sources(request):
headers={'Cache-Control': 'max-age: 604800'})


def start_celery_job(task_list, job_input, user=None,
exchange=MAGIC_EXCHANGE, routing_key=None):
def start_celery_job(task_list, job_input, user=None):
"""
Given a list of Celery tasks and it's input, starts a Celery async job with
those tasks, adds save_job_result and save_job_error handlers, and returns
Expand All @@ -697,19 +621,15 @@ def start_celery_job(task_list, job_input, user=None,
:param task_list: A list of Celery tasks to execute. Is made into a chain
:param job_input: Input to the first task, used in recording started jobs
:param user: The user requesting the job. Optional.
:param exchange: Allows restricting jobs to specific exchange. Optional.
:param routing_key: Allows restricting jobs to specific workers. Optional.
:return: A Response contianing the job id, marked as 'started'
"""
created = now()
job = Job.objects.create(created_at=created, result='', error='',
traceback='', user=user, status='started',
model_input=job_input)
routing_key = routing_key if routing_key else choose_worker()
success = save_job_result.s(job.id, job_input).set(exchange=exchange,
routing_key=routing_key)
error = save_job_error.s(job.id).set(exchange=exchange,
routing_key=routing_key)

success = save_job_result.s(job.id, job_input)
error = save_job_error.s(job.id)

task_list.append(success)
task_chain = chain(task_list).apply_async(link_error=error)
Expand Down
10 changes: 2 additions & 8 deletions src/mmw/mmw/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ def add_unlock_chord_task_shim(app):
from celery.exceptions import ChordError
from celery.result import allow_join_result, result_from_tuple

from apps.modeling.views import choose_worker, MAGIC_EXCHANGE

logger = logging.getLogger(__name__)

MAX_RETRIES = settings.CELERY_CHORD_UNLOCK_MAX_RETRIES
Expand Down Expand Up @@ -53,14 +51,10 @@ def unlock_chord(self, group_id, callback, interval=None,
ready = deps.ready()
except Exception as exc:
raise self.retry(
exc=exc, countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE, routing_key=choose_worker()
)
exc=exc, countdown=interval, max_retries=max_retries)
else:
if not ready:
raise self.retry(countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())
raise self.retry(countdown=interval, max_retries=max_retries)

callback = maybe_signature(callback, app=app)
try:
Expand Down
4 changes: 2 additions & 2 deletions src/mmw/mmw/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ def get_env_setting(setting):
CELERY_RESULT_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'djcelery.backends.cache:CacheBackend'
STATSD_CELERY_SIGNALS = True
CELERY_WORKER_DIRECT = True
CELERY_CREATE_MISSING_QUEUES = True
CELERY_CHORD_PROPAGATES = True
CELERY_CHORD_UNLOCK_MAX_RETRIES = 60
# CELERYD_CONCURRENCY = 2
CELERY_DEFAULT_QUEUE = STACK_COLOR
CELERY_DEFAULT_ROUTING_KEY = "task.%s" % STACK_COLOR
# END CELERY CONFIGURATION


Expand Down

0 comments on commit 8577f11

Please sign in to comment.