Skip to content

Commit

Permalink
Fixes #20465 - Polling sub tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
adamruzicka authored and iNecas committed Jul 31, 2017
1 parent 948b5f3 commit 11d44de
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 7 deletions.
33 changes: 28 additions & 5 deletions examples/sub_plans.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env ruby

example_description = <<DESC
Sub Plans Example
===================
Expand All @@ -16,20 +15,44 @@
require_relative 'example_helper'
require_relative 'orchestrate_evented'

COUNT = (ARGV[0] || 25).to_i

class SubPlansExample < Dynflow::Action
include Dynflow::Action::WithSubPlans
include Dynflow::Action::WithBulkSubPlans

def create_sub_plans
10.times.map { |i| trigger(OrchestrateEvented::CreateMachine, "host-#{i}", 'web_server') }
current_batch.map { |i| trigger(OrchestrateEvented::CreateMachine, "host-#{i}", 'web_server') }
end

def batch_size
5
end

def batch(from, size)
COUNT.times.drop(from).take(size)
end

def total_count
COUNT
end
end

class PollingSubPlansExample < SubPlansExample
include Dynflow::Action::WithPollingSubPlans
end

if $0 == __FILE__
triggered = ExampleHelper.world.trigger(SubPlansExample)
ExampleHelper.world.action_logger.level = Logger::INFO
ExampleHelper.world
t1 = ExampleHelper.world.trigger(SubPlansExample)
t2 = ExampleHelper.world.trigger(PollingSubPlansExample)
puts example_description
puts <<-MSG.gsub(/^.*\|/, '')
| Execution plan #{triggered.id} with sub plans triggered
| You can see the details at http://localhost:4567/#{triggered.id}
| Execution plans #{t1.id} and #{t2.id} with sub plans triggered
| You can see the details at
| http://localhost:4567/#{t2.id}
| http://localhost:4567/#{t1.id}
MSG

ExampleHelper.run_web_console
Expand Down
1 change: 1 addition & 0 deletions lib/dynflow/action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Action < Serializable
require 'dynflow/action/cancellable'
require 'dynflow/action/with_sub_plans'
require 'dynflow/action/with_bulk_sub_plans'
require 'dynflow/action/with_polling_sub_plans'

def self.all_children
children.values.inject(children.values) do |children, child|
Expand Down
16 changes: 14 additions & 2 deletions lib/dynflow/action/with_bulk_sub_plans.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@ def batch(from, size)

def run(event = nil)
if event === PlanNextBatch
spawn_plans if can_spawn_next_batch?
suspend
if can_spawn_next_batch?
spawn_plans
suspend
else
on_planning_finished
end
else
super
end
end

def on_planning_finished
suspend
end

def initiate
output[:planned_count] = 0
output[:total_count] = total_count
Expand Down Expand Up @@ -79,6 +87,10 @@ def cancel!(force = false)

private

def done?
!can_spawn_next_batch? && super
end

def can_spawn_next_batch?
total_count - output[:success_count] - output[:pending_count] - output[:failed_count] > 0
end
Expand Down
72 changes: 72 additions & 0 deletions lib/dynflow/action/with_polling_sub_plans.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
module Dynflow
module Action::WithPollingSubPlans

REFRESH_INTERVAL = 10
Poll = Algebrick.atom

def run(event = nil)
case event
when Poll
poll
else
super(event)
end
end

def poll
recalculate_counts
try_to_finish || suspend_and_ping
end

def initiate
ping suspended_action
super
end

def wait_for_sub_plans(sub_plans)
increase_counts(sub_plans.count, 0)
suspend
end

def resume
if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
output[:resumed_count] ||= 0
output[:resumed_count] += output[:failed_count]
# We're starting over and need to reset the counts
%w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
initiate
else
if self.is_a?(::Dynflow::Action::WithBulkSubPlans) && can_spawn_next_batch?
# Not everything was spawned
ping suspended_action
spawn_plans
suspend
else
poll
end
end
end

def notify_on_finish(_sub_plans)
suspend
end

def suspend_and_ping
suspend { |suspended_action| ping suspended_action }
end

def ping(suspended_action)
world.clock.ping suspended_action, REFRESH_INTERVAL, Poll
end

def recalculate_counts
total = sub_plans.count
failed = sub_plans('state' => %w(paused stopped), 'result' => 'error').count
success = sub_plans('state' => 'stopped', 'result' => 'success').count
output.update(:total_count => total - output.fetch(:resumed_count, 0),
:pending_count => 0,
:failed_count => failed - output.fetch(:resumed_count, 0),
:success_count => success)
end
end
end
186 changes: 186 additions & 0 deletions test/action_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,44 @@ def cancel_event?(event)
end
end

class PollingParentAction < ParentAction
include ::Dynflow::Action::WithPollingSubPlans
end

class PollingBulkParentAction < ParentAction
include ::Dynflow::Action::WithBulkSubPlans
include ::Dynflow::Action::WithPollingSubPlans

def poll
output[:poll] += 1
super
end

def on_planning_finished
output[:poll] = 0
output[:planning_finished] ||= 0
output[:planning_finished] += 1
super
end

def total_count
input[:count]
end

def batch_size
1
end

def create_sub_plans
current_batch.map { trigger(ChildAction, suspend: input[:suspend]) }
end

def batch(from, size)
total_count.times.drop(from).take(size)
end

end

let(:execution_plan) { world.trigger(ParentAction, count: 2).finished.value }

before do
Expand Down Expand Up @@ -450,6 +488,96 @@ def cancel_event?(event)
resumed_plan.state.must_equal :stopped
resumed_plan.result.must_equal :success
end

describe ::Dynflow::Action::WithPollingSubPlans do
include TestHelpers

let(:clock) { Dynflow::Testing::ManagedClock.new }
let(:polling_plan) { world.trigger(PollingParentAction, count: 2).finished.value }

specify "by default, when no sub plans were planned successfully, it calls create_sub_plans again" do
world.stub(:clock, clock) do
total = 2
FailureSimulator.fail_in_child_plan = true
triggered_plan = world.trigger(PollingParentAction, count: total)
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)

wait_for do # Waiting for the sub plans to be spawned
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)
polling_plan.sub_plans.count == total
end

# Moving the clock to make the parent check on sub plans
clock.pending_pings.count.must_equal 1
clock.progress

wait_for do # Waiting for the parent to realise the sub plans failed
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)
polling_plan.state == :paused
end

FailureSimulator.fail_in_child_plan = false

world.execute(polling_plan.id) # The actual resume

wait_for do # Waiting for new generation of sub plans to be spawned
polling_plan.sub_plans.count == 2 * total
end

# Move the clock again
clock.pending_pings.count.must_equal 1
clock.progress

wait_for do # Waiting for everything to finish successfully
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)
polling_plan.state == :stopped && polling_plan.result == :success
end
end
end

specify "by default it starts polling again" do
world.stub(:clock, clock) do
total = 2
FailureSimulator.fail_in_child_run = true
triggered_plan = world.trigger(PollingParentAction, count: total)
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)

wait_for do # Waiting for the sub plans to be spawned
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)
polling_plan.sub_plans.count == total &&
polling_plan.sub_plans.all? { |sub| sub.state == :paused }
end

# Moving the clock to make the parent check on sub plans
clock.pending_pings.count.must_equal 1
clock.progress
clock.pending_pings.count.must_equal 0

wait_for do # Waiting for the parent to realise the sub plans failed
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)
polling_plan.state == :paused
end

FailureSimulator.fail_in_child_run = false

# Resume the sub plans
polling_plan.sub_plans.each do |sub|
world.execute(sub.id)
end

wait_for do # Waiting for the child tasks to finish
polling_plan.sub_plans.all? { |sub| sub.state == :stopped }
end

world.execute(polling_plan.id) # The actual resume

wait_for do # Waiting for everything to finish successfully
polling_plan = world.persistence.load_execution_plan(triggered_plan.id)
polling_plan.state == :stopped && polling_plan.result == :success
end
end
end
end
end

describe 'cancelling' do
Expand Down Expand Up @@ -486,6 +614,64 @@ def cancel_event?(event)
end
end
end

describe ::Dynflow::Action::WithPollingSubPlans do
include TestHelpers

let(:clock) { Dynflow::Testing::ManagedClock.new }

specify 'polls for sub plans state' do
world.stub :clock, clock do
total = 2
triggered_plan = world.trigger(PollingParentAction, count: total)
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.state.must_equal :planned
clock.pending_pings.count.must_equal 0
wait_for do
plan.sub_plans.count == total &&
plan.sub_plans.all? { |sub| sub.result == :success }
end
clock.pending_pings.count.must_equal 1
clock.progress
wait_for do
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.state == :stopped
end
clock.pending_pings.count.must_equal 0
end
end

specify 'starts polling for sub plans at the beginning' do
world.stub :clock, clock do
total = 2
triggered_plan = world.trigger(PollingBulkParentAction, count: total)
plan = world.persistence.load_execution_plan(triggered_plan.id)
assert_nil plan.entry_action.output[:planning_finished]
clock.pending_pings.count.must_equal 0
wait_for do
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.entry_action.output[:planning_finished] == 1
end
# Poll was set during #initiate
clock.pending_pings.count.must_equal 1

# Wait for the sub plans to finish
wait_for do
plan.sub_plans.count == total &&
plan.sub_plans.all? { |sub| sub.result == :success }
end

# Poll again
clock.progress
wait_for do
plan = world.persistence.load_execution_plan(triggered_plan.id)
plan.state == :stopped
end
plan.entry_action.output[:poll].must_equal 1
clock.pending_pings.count.must_equal 0
end
end
end
end
end
end

0 comments on commit 11d44de

Please sign in to comment.