Skip to content

Commit

Permalink
Handle batch and regular planning consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
adamruzicka committed Jul 26, 2017
1 parent 561582a commit f9509c4
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 58 deletions.
28 changes: 14 additions & 14 deletions lib/dynflow/action/with_polling_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@ def poll
try_to_finish || suspend_and_ping
end

def wait_for_sub_plans(sub_plans)
increase_counts(sub_plans.count, 0)
if is_a?(::Dynflow::Action::WithBulkSubPlans)
suspend
else
poll
end
def initiate
ping suspended_action
super
end

def on_planning_finished
poll
def wait_for_sub_plans(sub_plans)
increase_counts(sub_plans.count, 0)
suspend
end

def resume
Expand All @@ -38,6 +35,8 @@ def resume
initiate
else
if self.is_a?(::Dynflow::Actions::WithBulkSubPlans) && can_spawn_next_batch?
# Not everything was spawned
ping suspended_action
spawn_plans
suspend
else
Expand All @@ -46,15 +45,16 @@ def resume
end
end


def notify_on_finish(_sub_plans)
suspend_and_ping
suspend
end

def suspend_and_ping
suspend do |suspended_action|
world.clock.ping suspended_action, REFRESH_INTERVAL, Poll
end
suspend { |suspended_action| ping suspended_action }
end

def ping(suspended_action)
world.clock.ping suspended_action, REFRESH_INTERVAL, Poll
end

def recalculate_counts
Expand Down
117 changes: 73 additions & 44 deletions test/action_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,45 @@ def run(event = nil)
end
end

class PollingParentAction < ParentAction
include ::Dynflow::Action::WithPollingSubPlans
end

class PollingBulkParentAction < PollingParentAction
include ::Dynflow::Action::WithBulkSubPlans
include ::Dynflow::Action::WithPollingSubPlans

def on_planning_finished
output[:poll] = 0
output[:planning_finished] ||= 0
output[:planning_finished] += 1
super
end

def poll
output[:poll] += 1
super
end


def total_count
input[:count]
end

def batch_size
1
end

def create_sub_plans
current_batch.map { trigger(ChildAction, suspend: input[:suspend]) }
end

def batch(from, size)
total_count.times.drop(from).take(size)
end

end

let(:execution_plan) { world.trigger(ParentAction, count: 2).finished.value }

before do
Expand Down Expand Up @@ -442,6 +481,34 @@ def run(event = nil)
resumed_plan.state.must_equal :stopped
resumed_plan.result.must_equal :success
end

describe ::Dynflow::Action::WithPollingSubPlans do
include TestHelpers

let(:klok) { Dynflow::Testing::ManagedClock.new }

specify "by default it starts polling again" do
world.stub(:clock, klok) do
total = 2
FailureSimulator.fail_in_child_run = true
triggered_plan = world.trigger(PollingParentAction, count: total)
wait_for do
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.sub_plans.count == total &&
plan.sub_plans.all? { |sub| sub.state == :paused }
end
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.entry_action.output[:poll].must_equal 1
plan.status.must_equal :paused
FailureSimulator.fail_in_child_run = false
world.execute(plan.id)

wait_for do
plan.sub_plans.all? { |sub| sub.result == :warning }
end
end
end
end
end

describe 'cancelling' do
Expand All @@ -465,51 +532,15 @@ def run(event = nil)
describe ::Dynflow::Action::WithPollingSubPlans do
include TestHelpers

class PollingParentAction < ParentAction
include ::Dynflow::Action::WithPollingSubPlans
end

class PollingBulkParentAction < ParentAction
include ::Dynflow::Action::WithBulkSubPlans
include ::Dynflow::Action::WithPollingSubPlans

def total_count
input[:count]
end

def batch_size
1
end

def create_sub_plans
current_batch.map { trigger(ChildAction, suspend: input[:suspend]) }
end

def on_planning_finished
output[:poll] = 0
output[:planning_finished] ||= 0
output[:planning_finished] += 1
super
end

def poll
output[:poll] += 1
super
end

def batch(from, size)
total_count.times.drop(from).take(size)
end

end

let(:klok) { Dynflow::Testing::ManagedClock.new }

specify 'polls for sub plans state' do
world.stub :clock, klok do
total = 2
triggered_plan = world.trigger(PollingParentAction, count: total)
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.state.must_equal :planned
klok.pending_pings.count.must_equal 0
wait_for do
plan.sub_plans.count == total &&
plan.sub_plans.all? { |sub| sub.result == :success }
Expand All @@ -524,20 +555,18 @@ def batch(from, size)
end
end

specify 'polls for sub plans after all batches were planned' do
specify 'starts polling for sub plans at the beginning' do
world.stub :clock, klok do
total = 2
triggered_plan = world.trigger(PollingBulkParentAction, count: total)
plan = world.persistence.load_execution_plan(triggered_plan.id)
assert_nil plan.entry_action.output[:planning_finished]
klok.pending_pings.count.must_equal 0
wait_for do
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.entry_action.output[:planning_finished] == 1
end
# It polls when the tasks are planned
plan.entry_action.output[:poll].must_equal 1

# We're not done yet
# Poll was set during #initiate
klok.pending_pings.count.must_equal 1

# Wait for the sub plans to finish
Expand All @@ -552,7 +581,7 @@ def batch(from, size)
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.state == :stopped
end
plan.entry_action.output[:poll].must_equal 2
plan.entry_action.output[:poll].must_equal 1
klok.pending_pings.count.must_equal 0
end
end
Expand Down

0 comments on commit f9509c4

Please sign in to comment.