Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove custom celery routing and exchange #2193

Merged
merged 1 commit into from
Aug 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions src/mmw/apps/modeling/mapshed/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,7 @@ def nlcd_kfactor(result):
return output


def geoprocessing_chains(aoi, wkaoi, exchange, errback, choose_worker):
worker = choose_worker()
def geoprocessing_chains(aoi, wkaoi, errback):
task_defs = [
('nlcd_soils', nlcd_soils, {'polygon': [aoi]}),
('gwn', gwn, {'polygon': [aoi]}),
Expand All @@ -424,11 +423,8 @@ def geoprocessing_chains(aoi, wkaoi, exchange, errback, choose_worker):
]

return [
run.s(opname, data, wkaoi).set(exchange=exchange,
routing_key=worker) |
callback.s().set(link_error=errback,
exchange=exchange,
routing_key=choose_worker())
run.s(opname, data, wkaoi) |
callback.s().set(link_error=errback)
for (opname, callback, data) in task_defs
]

Expand Down
132 changes: 26 additions & 106 deletions src/mmw/apps/modeling/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@
from __future__ import unicode_literals

import json
import random

import celery
from celery import chain, group

from retry import retry

from rest_framework.response import Response
from rest_framework import decorators, status
from rest_framework.permissions import (AllowAny,
Expand Down Expand Up @@ -39,11 +35,6 @@
ScenarioSerializer)


# When CELERY_WORKER_DIRECT = True, this exchange is automatically
# created to allow direct communication with workers.
MAGIC_EXCHANGE = 'C.dq'


@decorators.api_view(['GET', 'POST'])
@decorators.permission_classes((IsAuthenticated, ))
def projects(request):
Expand Down Expand Up @@ -225,11 +216,9 @@ def start_gwlfe(request, format=None):

def _initiate_gwlfe_job_chain(model_input, inputmod_hash, job_id):
chain = (tasks.run_gwlfe.s(model_input, inputmod_hash)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()) |
save_job_result.s(job_id, model_input)
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker()))
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())
| save_job_result.s(job_id, model_input))

errback = save_job_error.s(job_id)

return chain.apply_async(link_error=errback)

Expand Down Expand Up @@ -261,26 +250,15 @@ def start_mapshed(request, format=None):


def _initiate_mapshed_job_chain(mapshed_input, job_id):
workers = get_living_workers()
get_worker = lambda: random.choice(workers)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=get_worker())
errback = save_job_error.s(job_id)

area_of_interest, wkaoi = parse_input(mapshed_input)

job_chain = (
group(geoprocessing_chains(area_of_interest, wkaoi,
MAGIC_EXCHANGE, errback, choose_worker)) |
combine.s().set(
exchange=MAGIC_EXCHANGE,
routing_key=get_worker()) |
collect_data.s(area_of_interest).set(
link_error=errback,
exchange=MAGIC_EXCHANGE,
routing_key=get_worker()) |
save_job_result.s(job_id, mapshed_input).set(
exchange=MAGIC_EXCHANGE,
routing_key=get_worker()))
group(geoprocessing_chains(area_of_interest, wkaoi, errback)) |
combine.s() |
collect_data.s(area_of_interest).set(link_error=errback) |
save_job_result.s(job_id, mapshed_input))

return chain(job_chain).apply_async(link_error=errback)

Expand All @@ -307,37 +285,29 @@ def export_gms(request, format=None):
@decorators.permission_classes((AllowAny, ))
def start_analyze_land(request, format=None):
user = request.user if request.user.is_authenticated() else None
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()

area_of_interest, wkaoi = parse_input(request.POST['analyze_input'])

geop_input = {'polygon': [area_of_interest]}

return start_celery_job([
geoprocessing.run.s('nlcd', geop_input, wkaoi)
.set(exchange=exchange, routing_key=routing_key),
geoprocessing.run.s('nlcd', geop_input, wkaoi),
tasks.analyze_nlcd.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


@decorators.api_view(['POST'])
@decorators.permission_classes((AllowAny, ))
def start_analyze_soil(request, format=None):
user = request.user if request.user.is_authenticated() else None
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()

area_of_interest, wkaoi = parse_input(request.POST['analyze_input'])

geop_input = {'polygon': [area_of_interest]}

return start_celery_job([
geoprocessing.run.s('soil', geop_input, wkaoi)
.set(exchange=exchange, routing_key=routing_key),
geoprocessing.run.s('soil', geop_input, wkaoi),
tasks.analyze_soil.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand All @@ -346,11 +316,9 @@ def start_analyze_soil(request, format=None):
def start_analyze_animals(request, format=None):
user = request.user if request.user.is_authenticated() else None
area_of_interest, __ = parse_input(request.POST['analyze_input'])
exchange = MAGIC_EXCHANGE

return start_celery_job([
tasks.analyze_animals.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand All @@ -359,11 +327,9 @@ def start_analyze_animals(request, format=None):
def start_analyze_pointsource(request, format=None):
user = request.user if request.user.is_authenticated() else None
area_of_interest, __ = parse_input(request.POST['analyze_input'])
exchange = MAGIC_EXCHANGE

return start_celery_job([
tasks.analyze_pointsource.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand All @@ -372,11 +338,9 @@ def start_analyze_pointsource(request, format=None):
def start_analyze_catchment_water_quality(request, format=None):
user = request.user if request.user.is_authenticated() else None
area_of_interest, __ = parse_input(request.POST['analyze_input'])
exchange = MAGIC_EXCHANGE

return start_celery_job([
tasks.analyze_catchment_water_quality.s(area_of_interest)
.set(exchange=exchange, routing_key=choose_worker())
], area_of_interest, user)


Expand Down Expand Up @@ -411,39 +375,12 @@ def get_job(request, job_uuid, format=None):
)


def get_living_workers():
def predicate(worker_name):
return settings.STACK_COLOR in worker_name or 'debug' in worker_name

@retry(Exception, delay=0.5, backoff=2, tries=3)
def get_list_of_workers():
workers = celery.current_app.control.inspect().ping()

if workers is None:
raise Exception('Unable to receive a PONG from any workers')

return workers.keys()

workers = filter(predicate,
get_list_of_workers())
return workers


def choose_worker():
return random.choice(get_living_workers())


def _initiate_rwd_job_chain(location, snapping, data_source,
job_id, testing=False):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())

return chain(tasks.start_rwd_job.s(location, snapping, data_source)
.set(exchange=exchange, routing_key=routing_key),
save_job_result.s(job_id, location)
.set(exchange=exchange, routing_key=choose_worker())) \
errback = save_job_error.s(job_id)

return chain(tasks.start_rwd_job.s(location, snapping, data_source),
save_job_result.s(job_id, location)) \
.apply_async(link_error=errback)


Expand All @@ -468,15 +405,12 @@ def start_tr55(request, format=None):

def _initiate_tr55_job_chain(model_input, job_id):
job_chain = _construct_tr55_job_chain(model_input, job_id)
errback = save_job_error.s(job_id).set(exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())
errback = save_job_error.s(job_id)

return chain(job_chain).apply_async(link_error=errback)


def _construct_tr55_job_chain(model_input, job_id):
exchange = MAGIC_EXCHANGE
routing_key = choose_worker()

job_chain = []

Expand All @@ -500,23 +434,18 @@ def _construct_tr55_job_chain(model_input, job_id):
census_hash == current_hash) or not pieces)):
censuses = [aoi_census] + modification_census_items

job_chain.append(tasks.run_tr55.s(censuses, aoi, model_input)
.set(exchange=exchange, routing_key=choose_worker()))
job_chain.append(tasks.run_tr55.s(censuses, aoi, model_input))
else:
job_chain.append(tasks.nlcd_soil_census.s()
.set(exchange=exchange, routing_key=choose_worker()))
job_chain.append(tasks.nlcd_soil_census.s())

if aoi_census and pieces:
polygons = [m['shape']['geometry'] for m in pieces]
geop_input = {'polygon': [json.dumps(p) for p in polygons]}

job_chain.insert(0, geoprocessing.run.s('nlcd_soil_census',
geop_input)
.set(exchange=exchange, routing_key=routing_key))
geop_input))
job_chain.append(tasks.run_tr55.s(aoi, model_input,
cached_aoi_census=aoi_census)
.set(exchange=exchange,
routing_key=choose_worker()))
cached_aoi_census=aoi_census))
else:
polygons = [aoi] + [m['shape']['geometry'] for m in pieces]
geop_input = {'polygon': [json.dumps(p) for p in polygons]}
Expand All @@ -525,14 +454,10 @@ def _construct_tr55_job_chain(model_input, job_id):

job_chain.insert(0, geoprocessing.run.s('nlcd_soil_census',
geop_input,
wkaoi)
.set(exchange=exchange, routing_key=routing_key))
job_chain.append(tasks.run_tr55.s(aoi, model_input)
.set(exchange=exchange,
routing_key=choose_worker()))
wkaoi))
job_chain.append(tasks.run_tr55.s(aoi, model_input))

job_chain.append(save_job_result.s(job_id, model_input)
.set(exchange=exchange, routing_key=choose_worker()))
job_chain.append(save_job_result.s(job_id, model_input))

return job_chain

Expand Down Expand Up @@ -687,8 +612,7 @@ def drb_point_sources(request):
headers={'Cache-Control': 'max-age: 604800'})


def start_celery_job(task_list, job_input, user=None,
exchange=MAGIC_EXCHANGE, routing_key=None):
def start_celery_job(task_list, job_input, user=None):
"""
Given a list of Celery tasks and it's input, starts a Celery async job with
those tasks, adds save_job_result and save_job_error handlers, and returns
Expand All @@ -697,19 +621,15 @@ def start_celery_job(task_list, job_input, user=None,
:param task_list: A list of Celery tasks to execute. Is made into a chain
:param job_input: Input to the first task, used in recording started jobs
:param user: The user requesting the job. Optional.
:param exchange: Allows restricting jobs to specific exchange. Optional.
:param routing_key: Allows restricting jobs to specific workers. Optional.
:return: A Response contianing the job id, marked as 'started'
"""
created = now()
job = Job.objects.create(created_at=created, result='', error='',
traceback='', user=user, status='started',
model_input=job_input)
routing_key = routing_key if routing_key else choose_worker()
success = save_job_result.s(job.id, job_input).set(exchange=exchange,
routing_key=routing_key)
error = save_job_error.s(job.id).set(exchange=exchange,
routing_key=routing_key)

success = save_job_result.s(job.id, job_input)
error = save_job_error.s(job.id)

task_list.append(success)
task_chain = chain(task_list).apply_async(link_error=error)
Expand Down
10 changes: 2 additions & 8 deletions src/mmw/mmw/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ def add_unlock_chord_task_shim(app):
from celery.exceptions import ChordError
from celery.result import allow_join_result, result_from_tuple

from apps.modeling.views import choose_worker, MAGIC_EXCHANGE

logger = logging.getLogger(__name__)

MAX_RETRIES = settings.CELERY_CHORD_UNLOCK_MAX_RETRIES
Expand Down Expand Up @@ -53,14 +51,10 @@ def unlock_chord(self, group_id, callback, interval=None,
ready = deps.ready()
except Exception as exc:
raise self.retry(
exc=exc, countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE, routing_key=choose_worker()
)
exc=exc, countdown=interval, max_retries=max_retries)
else:
if not ready:
raise self.retry(countdown=interval, max_retries=max_retries,
exchange=MAGIC_EXCHANGE,
routing_key=choose_worker())
raise self.retry(countdown=interval, max_retries=max_retries)

callback = maybe_signature(callback, app=app)
try:
Expand Down
4 changes: 2 additions & 2 deletions src/mmw/mmw/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ def get_env_setting(setting):
CELERY_RESULT_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'djcelery.backends.cache:CacheBackend'
STATSD_CELERY_SIGNALS = True
CELERY_WORKER_DIRECT = True
CELERY_CREATE_MISSING_QUEUES = True
CELERY_CHORD_PROPAGATES = True
CELERY_CHORD_UNLOCK_MAX_RETRIES = 60
# CELERYD_CONCURRENCY = 2
CELERY_DEFAULT_QUEUE = STACK_COLOR
CELERY_DEFAULT_ROUTING_KEY = "task.%s" % STACK_COLOR
# END CELERY CONFIGURATION


Expand Down