-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split up & speed up Mapshed job setup #1529
Split up & speed up Mapshed job setup #1529
Conversation
@@ -130,6 +130,7 @@ def get_env_setting(setting): | |||
CELERY_CREATE_MISSING_QUEUES = True | |||
CELERY_CHORD_PROPAGATES = True | |||
CELERY_CHORD_UNLOCK_MAX_RETRIES = 60 | |||
# CELERYD_CONCURRENCY = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing this setting had a measurable effect on the run time, as one would expect. I didn't observe any issues with a value of 2, but at 4+ celery workers would segfault, which is not a good look. At the same time, it may be worth considering tweaking this value in future, especially for shorter tasks such as setting up Mapshed jobs, since it avoids the overhead of creating new instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are currently running 2 Celery workers per machine, with a worker concurrency of 1. See celery_number_of_workers
and celery_processes_per_worker
here.
If we plan to tweak these settings, I think it would be good to keep the changes within Ansible so that they can live closer to the process that drives the instance type Celery resides on. Also, tweaking this setting will require monitoring Spark carefully, as the increase in workers can increase the number of things submitted to Spark at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point about Spark. I definitely don't advocate tweaking it until we have some kind of test in place that can mimic each type of potential resource or concurrency problem, but given the time/money required it may prove to be a worthy trade at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate on which method was blocking and how it was split up into parallel tasks?
|
||
workers = filter(predicate, | ||
get_list_of_workers()) | ||
return workers | ||
|
||
def choose_worker(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider refactoring choose_worker
to be defined in terms of choose_workers
to reduce code duplication.
@@ -357,6 +366,22 @@ def get_job(request, job_uuid, format=None): | |||
} | |||
) | |||
|
|||
def choose_workers(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming this to get_active_workers
, or something similar. The output of this function is a list of all workers that the app is allowed to use (so if it's a green deploy, we will have access to all green workers but none of the blue ones). Thus, choose_workers
is a bit misleading since we're not really making a choice.
exchange=exchange, | ||
routing_key=choose_worker())) | ||
for ((opname, data, callback), geop_worker) in tasks] | ||
@shared_task(bind=True, default_retry_delay=1, max_retries=42) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should be retrying the entire chain of geop_task
. The mapshed_start
and mapshed_finish
tasks have retries specified on them and that should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would that retry the entire task or just geop_task
in the event of one geop_task
failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each geop_task
is a little chain, like this:
We retry mapshed_start
because it is possible that Spark JobServer's queue is full and there isn't room on the first try, but if we retry we may find room. We retry mapshed_finish
because we're waiting for Spark JobServer to finish the task, and the retry acts as a check to see if the task is finished.
We don't retry the final callback because that is a pure function. If it doesn't work the first time, it will likely not work again given the same input.
Since the inner blocks are already being retried, we don't need to add another layer here. In case there is a failure, it will retry and get the same failure each time.
Does that make sense?
def geop_tasks(): | ||
return geop_task_defs().keys() | ||
|
||
def geop_task_defs(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be stored as a dictionary instead of a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it would flatten things a bit as well.
routing_key=geop_worker) | | ||
callback.s().set(link_error=errback, | ||
exchange=exchange, | ||
routing_key=geop_worker)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callback doesn't need to happen on the same geop_worker
as mapshed_start
or mapshed_finish
. We can safely use choose_worker()
here.
routing_key=choose_worker()) | | ||
save_job_result.s(job_id, mapshed_input) | ||
.set(exchange=MAGIC_EXCHANGE, routing_key=choose_worker())) | ||
job_chain = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this switch from the pipe-syntax to creating an array and chaining at the end the main thing that provides the speed up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was a small benefit to doing that, but the main block was choose_worker
. I wasn't able to get a solid grasp on when parts of chained tasks ended up being evaluated, nor exactly how parallel the operations ended up being.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah got it. I would recommend splitting the choose_worker
refactor (whose performance benefits we are more confident about) off into it's own commit, and keeping these stylistic changes separate.
@@ -27,13 +27,15 @@ | |||
from apps.core.models import Job | |||
from apps.core.tasks import save_job_error, save_job_result | |||
from apps.modeling import tasks | |||
from apps.modeling.mapshed.tasks import geop_tasks, collect_data, combine | |||
from apps.modeling.mapshed.tasks import (geop_tasks, collect_data, combine, | |||
geop_task_defs, geop_task, geop_tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
geop_tasks
is imported twice, and geop_task_defs
is never used.
|
||
geom = GEOSGeometry(json.dumps(mapshed_input['area_of_interest']), | ||
srid=4326) | ||
|
||
chain = (group(geop_tasks(geom, errback, MAGIC_EXCHANGE, choose_worker)) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting that this used to shadow the celery chain function, we must have overlooked that.
@@ -254,22 +256,29 @@ def start_mapshed(request, format=None): | |||
|
|||
|
|||
def _initiate_mapshed_job_chain(mapshed_input, job_id): | |||
workers = choose_workers() | |||
get_worker = lambda: random.choice(workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be the most significant difference between the old version. Instead of making a query for a celery worker for each task in the job chain, you query the list of workers once for the whole job chain.
I'd be curious to see how much of an impact this change has by itself, without any of the other changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pooling the workers for each job made the biggest difference, but I did notice a small change just from splitting the task up as well, although I don't have as clear an idea on why exactly that might be the case. I think that the former implementation would be equivalent with enough workers or higher degrees of concurrency, but I'm similarly unsure about what happens in the even of failure in each case.
For gathering performance benchmarks you can use flower. We had it installed at one point but it was removed. Quick start guide:
|
Update: Think I found the cause of this. Posted in a separate comment. |
except Exception as x: | ||
return { | ||
'error': x.message | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would strongly consider not swallowing exceptions here. If this is a general MMW design pattern, I would argue against following it. This makes debugging a lot more difficult.
When I first tried to run Mapshed I was getting a pretty mysterious error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/django/core/handlers/base.py", line 132, in get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/usr/local/lib/python2.7/dist-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
return view_func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/django/views/generic/base.py", line 71, in view
return self.dispatch(request, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/rest_framework/views.py", line 452, in dispatch
response = self.handle_exception(exc)
File "/usr/local/lib/python2.7/dist-packages/rest_framework/views.py", line 449, in dispatch
response = handler(request, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/rest_framework/decorators.py", line 50, in handler
return func(*args, **kwargs)
File "/opt/app/apps/modeling/views.py", line 246, in start_mapshed
task_list = _initiate_mapshed_job_chain(mapshed_input, job.id)
File "/opt/app/apps/modeling/views.py", line 293, in _initiate_mapshed_job_chain
return chain(job_chain).apply_async(link_error=errback)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 251, in apply_async
return _apply(args, kwargs, **options)
File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 277, in apply_async
tasks, results = self.prepare_steps(args, kwargs['tasks'])
File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 237, in prepare_steps
res = task.freeze()
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 539, in freeze
task = maybe_signature(task, app=self._app).clone()
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 197, in clone
s = Signature.from_dict({'task': self.task, 'args': tuple(args),
TypeError: 'NoneType' object is not iterable
After removing this try/catch it produced a much more useful error message:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/django/core/handlers/base.py", line 132, in get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/usr/local/lib/python2.7/dist-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
return view_func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/django/views/generic/base.py", line 71, in view
return self.dispatch(request, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/rest_framework/views.py", line 452, in dispatch
response = self.handle_exception(exc)
File "/usr/local/lib/python2.7/dist-packages/rest_framework/views.py", line 449, in dispatch
response = handler(request, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/rest_framework/decorators.py", line 50, in handler
return func(*args, **kwargs)
File "/opt/app/apps/modeling/views.py", line 246, in start_mapshed
task_list = _initiate_mapshed_job_chain(mapshed_input, job.id)
File "/opt/app/apps/modeling/views.py", line 290, in _initiate_mapshed_job_chain
print(c)
File "/usr/local/lib/python2.7/dist-packages/celery/canvas.py", line 556, in __repr__
return repr(self.tasks)
File "/usr/lib/python2.7/UserList.py", line 16, in __repr__
def __repr__(self): return repr(self.data)
File "/usr/local/lib/python2.7/dist-packages/kombu/utils/__init__.py", line 325, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python2.7/dist-packages/celery/utils/functional.py", line 320, in data
return list(self.__it)
File "/opt/app/apps/modeling/views.py", line 277, in <genexpr>
for t in geop_tasks()))
File "/usr/local/lib/python2.7/dist-packages/celery/local.py", line 167, in <lambda>
__call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 420, in __call__
return self.run(*args, **kwargs)
File "/opt/app/apps/modeling/mapshed/tasks.py", line 494, in geop_task
(opname, data, callback) = geop_task_defs()[taskName](geom)
File "/opt/app/apps/modeling/mapshed/tasks.py", line 467, in <lambda>
{'polygon': [geom.geojson], 'vector': streams(geom)},
File "/opt/app/apps/modeling/mapshed/calcs.py", line 345, in streams
cursor.execute(sql, [geom.wkt, geom.wkt])
File "/usr/local/lib/python2.7/dist-packages/django/db/backends/utils.py", line 79, in execute
return super(CursorDebugWrapper, self).execute(sql, params)
File "/usr/local/lib/python2.7/dist-packages/django/db/backends/utils.py", line 64, in execute
return self.cursor.execute(sql, params)
File "/usr/local/lib/python2.7/dist-packages/django/db/utils.py", line 98, in __exit__
six.reraise(dj_exc_type, dj_exc_value, traceback)
File "/usr/local/lib/python2.7/dist-packages/django/db/backends/utils.py", line 64, in execute
return self.cursor.execute(sql, params)
ProgrammingError: relation "nhdflowline" does not exist
LINE 6: FROM nhdflowline
It looks like I need to run setupdb.sh
to load the correct data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I just copied the block from one of the mapshed tasks without modification, but I had the DB set up already so I wouldn't have seen the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding swallowing exceptions: this was done in the first two tasks in the three-task chains here:
Because if there was an exception in the inner chain on any task except the last task, the chord would never unlock and Celery would be stuck in a loop forever.
Here's an excerpt from my upcoming blog post about the matter:
Error Handling in Nested Chains
Even though the MapShed workflow shown above is complex with multiple specialized tasks, the error handling is very general. Regardless of the task that raises an exception or encounters an error, we simply want to log it to the database and return to the client. We assumed that given its general applicability, we could specify the error handler once on the parent chain and have it be catch errors raised anywhere in the constituent tasks. Unfortunately this is not true, and we must attach the error handler separately to every task in every subchain. We do this by defining the error handling task once, and passing it as a parameter to the function that creates all geoprocessing task chains.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that explanation sounds reasonable. However, in this case, the error I encountered caused a server error and never appeared in the frontend UI or syslog or kibana. Running debugcelery
also did not reveal the actual error payload. I'm not sure if there's a flaw in our general approach, or if the refactoring in this task interfered with the existing error handling workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue was refactoring in this task. The forwarded error needs to be thrown by the next step as done here.
def mapshed_finish(self, incoming):
if 'error' in incoming:
return incoming
We really don't need to be doing this here though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually did install it but I didn't get to try it out. |
It's difficult to review this without benchmarks. We don't have celery configured in such a way to easily collect and compare metrics. My instinct is that the changes to how workers are selected per task is the biggest significant improvement, and that's a change that we should definitely keep. The rest of the changes seem to be a slightly different way of doing the same thing. I would recommend splitting these changes into several commits and pull requests, so that we can test each change against the staging environment instead of speculating. |
Breaking up the setup was actually (I thought) the main point of the issue, so that it would be easier to see what was happening. I incidentally noticed that the worker selection was a major source of blockage while I was trying to increase the parallelism.
I had previously mentioned that that was the major problem, it was about 1200ms per invocation. I'm not familiar enough with celery to know for sure whether the prior implementation of |
@@ -256,24 +256,27 @@ def start_mapshed(request, format=None): | |||
|
|||
|
|||
def _initiate_mapshed_job_chain(mapshed_input, job_id): | |||
workers = get_living_workers() | |||
get_worker = lambda: random.choice(workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may be able to realize performance benefits on all other celery tasks (not just mapshed) if we apply this design pattern to every job. Maybe get_worker
/choose_worker
function gets passed in as an argument to all of these _initiate_xyz_job_chain
functions?
|
||
return chain.apply_async(link_error=errback) | ||
return chain(job_chain).apply_async(link_error=errback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect the only reason this was slow originally is because each of those geop tasks calls choose_worker
. Since you fixed that, we may no longer need to refactor how the job chain is composed.
@@ -130,6 +130,7 @@ def get_env_setting(setting): | |||
CELERY_CREATE_MISSING_QUEUES = True | |||
CELERY_CHORD_PROPAGATES = True | |||
CELERY_CHORD_UNLOCK_MAX_RETRIES = 60 | |||
# CELERYD_CONCURRENCY = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are currently running 2 Celery workers per machine, with a worker concurrency of 1. See celery_number_of_workers
and celery_processes_per_worker
here.
If we plan to tweak these settings, I think it would be good to keep the changes within Ansible so that they can live closer to the process that drives the instance type Celery resides on. Also, tweaking this setting will require monitoring Spark carefully, as the increase in workers can increase the number of things submitted to Spark at a time.
return workers | ||
|
||
|
||
def choose_worker(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One potential downside to changing the way this works is that the list of workers choose_worker()
operates on grows stale as work is being done. I don't know that that is super likely, but are we currently asking Celery for available workers enough that this optimization gets us a lot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I misunderstand your point, but within the context of choose_worker
's use I think the list of available workers will always be up-to-date at the time of invocation since it's just a thin selector over get_living_workers
, unless the python interpreter does some type of caching or memoization internally. From the perspective of all consuming methods, I think the change to choose_worker
is invisible; I split get_living_workers
into a separate declaration so that I could bind the result for re-use specifically in _initiate_mapshed_job
and leave every other use unaffected.
To your question, pinging celery for each job was a bottleneck with noticeable effects, especially for job chains with more than a couple jobs and where the jobs themselves are not especially long-running. You're correct that the list of workers in the mapshed chain could grow stale, but the chain itself isn't very long running, and qualitatively I think there's some confidence in the stability of the workers in the +10 second time frame.
I also created #1535 to address the concern of leaving changes in celery workers unhandled.
This addresses a couple issues with the setup process for Mapshed jobs. First, it breaks up the setup task into smaller tasks, which, in general, are easier to debug and profile both for purposes of this issue as well as in future. Second, it makes some minor changes to the implementation with the aim of speeding the process up in general, primarily by removing a notable blocking method invocation as well as parallelizing the process where possible.
To test:
Vagrantfile
to bump up the settings forworker
; I was testing it with 2 CPUs and 2048 GB memoryThe
POST
to/api/modeling/start/mapshed
should be faster than it was before or currently is on staging (the call in staging should take 18-20 seconds, this implementation could take 2-8 seconds depending on the VM resources). The overall model run was also faster in my testing, but there are several mitigating factors if a local version is compared to staging, so it's probably not worth spending much time evaluating.Connects #1491