Skip to content

Commit

Permalink
Revert "Make pipelines properly propagate their targets to child pipe…
Browse files Browse the repository at this point in the history
…lines"

This reverts commit 443e311.
  • Loading branch information
tkaitchuck committed Aug 14, 2015
1 parent b3f56f0 commit 633c9fb
Showing 1 changed file with 26 additions and 38 deletions.
64 changes: 26 additions & 38 deletions python/src/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -1587,7 +1587,8 @@ def notify_barriers(self,
continue
blocking_slot_dict[slot_record.key()] = slot_record

barriers_to_trigger = []
task_list = []
updated_barriers = []
for barrier in results:
ready_slots = []
for blocking_slot_key in barrier.blocking_slots:
Expand All @@ -1606,44 +1607,32 @@ def notify_barriers(self,
# the task name tombstones.
pending_slots = set(barrier.blocking_slots) - set(ready_slots)
if not pending_slots:
barriers_to_trigger.append(barrier)
if barrier.status != _BarrierRecord.FIRED:
barrier.status = _BarrierRecord.FIRED
barrier.trigger_time = self._gettime()
updated_barriers.append(barrier)

purpose = barrier.key().name()
if purpose == _BarrierRecord.START:
path = self.pipeline_handler_path
countdown = None
else:
path = self.finalized_handler_path
# NOTE: Wait one second before finalization to prevent
# contention on the _PipelineRecord entity.
countdown = 1
pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
logging.debug('Firing barrier %r', barrier.key())
task_list.append(taskqueue.Task(
url=path,
countdown=countdown,
name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
params=dict(pipeline_key=pipeline_key, purpose=purpose),
headers={'X-Ae-Pipeline-Key': pipeline_key}))
else:
logging.debug('Not firing barrier %r, Waiting for slots: %r',
barrier.key(), pending_slots)

pipeline_keys_to_trigger = [
_BarrierRecord.target.get_value_for_datastore(barrier)
for barrier in barriers_to_trigger]
pipelines_to_trigger = dict(zip(
pipeline_keys_to_trigger, db.get(pipeline_keys_to_trigger)))
task_list = []
updated_barriers = []
for barrier in barriers_to_trigger:
if barrier.status != _BarrierRecord.FIRED:
barrier.status = _BarrierRecord.FIRED
barrier.trigger_time = self._gettime()
updated_barriers.append(barrier)

purpose = barrier.key().name()
if purpose == _BarrierRecord.START:
path = self.pipeline_handler_path
countdown = None
else:
path = self.finalized_handler_path
# NOTE: Wait one second before finalization to prevent
# contention on the _PipelineRecord entity.
countdown = 1
pipeline_key = _BarrierRecord.target.get_value_for_datastore(barrier)
target = pipelines_to_trigger[pipeline_key].params.get('target')
logging.debug('Firing barrier %r', barrier.key())
task_list.append(taskqueue.Task(
url=path,
countdown=countdown,
name='ae-barrier-fire-%s-%s' % (pipeline_key.name(), purpose),
params=dict(pipeline_key=pipeline_key, purpose=purpose),
headers={'X-Ae-Pipeline-Key': pipeline_key},
target=target))

# Blindly overwrite _BarrierRecords that have an updated status. This is
# acceptable because by this point all finalization barriers for
# generator children should have already had their final outputs assigned.
Expand Down Expand Up @@ -2608,8 +2597,7 @@ def txn():
params=dict(pipeline_key=pipeline_key,
purpose=_BarrierRecord.START,
attempt=pipeline_record.current_attempt),
headers={'X-Ae-Pipeline-Key': pipeline_key},
target=pipeline_record.params.get('target'))
headers={'X-Ae-Pipeline-Key': pipeline_key})
task.add(queue_name=self.queue_name, transactional=True)

pipeline_record.put()
Expand Down Expand Up @@ -2727,7 +2715,7 @@ def post(self):
all_tasks.append(taskqueue.Task(
url=context.pipeline_handler_path,
params=dict(pipeline_key=pipeline_key),
target=child_pipeline.params.get('target'),
target=child_pipeline.params['target'],
headers={'X-Ae-Pipeline-Key': pipeline_key},
name='ae-pipeline-fan-out-' + child_pipeline.key().name()))

Expand Down

3 comments on commit 633c9fb

@tkaitchuck
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alangpierce Reverting this for the moment. It broke some tests that I haven't gotten to the bottom of yet.

@sit
Copy link

@sit sit commented on 633c9fb Jan 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any luck on figuring this out? It would be nice to propagate queue names.

@alangpierce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not working on this stuff anymore. The commit probably still works, so you could try applying it to a fork, but I'm not sure what's needed to make the tests pass.

Please sign in to comment.