Skip to content

Commit

Permalink
Make pipelines properly propagate their targets to child pipelines
Browse files Browse the repository at this point in the history
Summary:
There are 4 ways that a pipeline can start:
1.) It is started explicitly as a root pipeline.
2.) It is an immediately-runnable child of a pipeline that just ran (the
"fanout" case.
3.) It runs as the result of another pipeline finishing (which triggers a
barrier).
4.) It is a retry attempt for a pipeline.

Pipelines have a "target" parameter that specifies the version and module to run
on, but only the first case was actually using the target. This caused two
problems:
-The target was simply being ignored when spawning a child pipeline with a
specific target.
-The default target (the version/module the parent pipeline was running on) was
being ignored, so flipping the default code version while a pipeline is running
would cause the code to change for future child pipelines.

This commit correctly sets the target for the other three cases, which should
make explicitly-specified targets work and should make it so the version stays
the same across all pipelines unless otherwise specified.

Test Plan:
Temporarily add a print statement to the taskqueue.Task constructor that prints
the given target whenever the URL ends with '/run'.
Run /admin/bigbingo/summarize_end_to_end_test . The first print statement gives
a target of None (which is expected from my last commit), and all other print
statements give explicitly-specified targets.
Delete my BigQuery token and re-run /admin/bigbingo/summarize_end_to_end_test .
The retry code is triggered and the retry tasks have the target explicitly
specified.

Reviewers: chris, mattfaus

Reviewed By: mattfaus

Differential Revision: http://phabricator.khanacademy.org/D8599
  • Loading branch information
Alan Pierce authored and MattFaus committed Jun 11, 2015
1 parent adb3fd1 commit 443e311
Showing 1 changed file with 38 additions and 26 deletions.
64 changes: 38 additions & 26 deletions python/src/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1574,8 +1574,7 @@ def notify_barriers(self,
continue
blocking_slot_dict[slot_record.key()] = slot_record

task_list = []
updated_barriers = []
barriers_to_trigger = []
for barrier in results:
ready_slots = []
for blocking_slot_key in barrier.blocking_slots:
Expand All @@ -1594,32 +1593,44 @@ def notify_barriers(self,
# the task name tombstones.
pending_slots = set(barrier.blocking_slots) - set(ready_slots)
if not pending_slots:
if barrier.status != _BarrierRecord.FIRED:
barrier.status = _BarrierRecord.FIRED
barrier.trigger_time = self._gettime()
updated_barriers.append(barrier)

purpose = barrier.key().name()
if purpose == _BarrierRecord.START:
path = self.pipeline_handler_path
countdown = None
else:
path = self.finalized_handler_path
# NOTE: Wait one second before finalization to prevent
# contention on the _PipelineRecord entity.
countdown = 1
pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
logging.debug('Firing barrier %r', barrier.key())
task_list.append(taskqueue.Task(
url=path,
countdown=countdown,
name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
params=dict(pipeline_key=pipeline_key, purpose=purpose),
headers={'X-Ae-Pipeline-Key': pipeline_key}))
barriers_to_trigger.append(barrier)
else:
logging.debug('Not firing barrier %r, Waiting for slots: %r',
barrier.key(), pending_slots)

pipeline_keys_to_trigger = [
_BarrierRecord.target.get_value_for_datastore(barrier)
for barrier in barriers_to_trigger]
pipelines_to_trigger = dict(zip(
pipeline_keys_to_trigger, db.get(pipeline_keys_to_trigger)))
task_list = []
updated_barriers = []
for barrier in barriers_to_trigger:
if barrier.status != _BarrierRecord.FIRED:
barrier.status = _BarrierRecord.FIRED
barrier.trigger_time = self._gettime()
updated_barriers.append(barrier)

purpose = barrier.key().name()
if purpose == _BarrierRecord.START:
path = self.pipeline_handler_path
countdown = None
else:
path = self.finalized_handler_path
# NOTE: Wait one second before finalization to prevent
# contention on the _PipelineRecord entity.
countdown = 1
pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
target = pipelines_to_trigger[pipeline_key].params.get('target')
logging.debug('Firing barrier %r', barrier.key())
task_list.append(taskqueue.Task(
url=path,
countdown=countdown,
name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
params=dict(pipeline_key=pipeline_key, purpose=purpose),
headers={'X-Ae-Pipeline-Key': pipeline_key},
target=target))

# Blindly overwrite _BarrierRecords that have an updated status. This is
# acceptable because by this point all finalization barriers for
# generator children should have already had their final outputs assigned.
Expand Down Expand Up @@ -2594,7 +2605,8 @@ def txn():
params=dict(pipeline_key=pipeline_key,
purpose=_BarrierRecord.START,
attempt=pipeline_record.current_attempt),
headers={'X-Ae-Pipeline-Key': pipeline_key})
headers={'X-Ae-Pipeline-Key': pipeline_key},
target=pipeline_record.params.get('target'))
task.add(queue_name=self.queue_name, transactional=True)

pipeline_record.put()
Expand Down Expand Up @@ -2712,7 +2724,7 @@ def post(self):
all_tasks.append(taskqueue.Task(
url=context.pipeline_handler_path,
params=dict(pipeline_key=pipeline_key),
target=child_pipeline.params['target'],
target=child_pipeline.params.get('target'),
headers={'X-Ae-Pipeline-Key': pipeline_key},
name='ae-pipeline-fan-out-' + child_pipeline.key().name()))

Expand Down

0 comments on commit 443e311

Please sign in to comment.