Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adamruzicka committed Mar 7, 2017
1 parent c5e4024 commit f68280d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
27 changes: 16 additions & 11 deletions lib/dynflow/action/with_bulk_sub_plans.rb
Expand Up @@ -2,7 +2,7 @@ module Dynflow
module Action::WithBulkSubPlans
include Dynflow::Action::Cancellable

BATCH_SIZE = 10
DEFAULT_BATCH_SIZE = 100

# Should return a slice of size items starting from item with index from
def batch(from, size)
Expand All @@ -20,14 +20,25 @@ def run(event = nil)
end
end

def initiate
output[:planned_count] = 0
output[:total_count] = total_count
super
end

def increase_counts(planned, failed)
super(planned, failed, false)
output[:planned_count] += planned
end

# Should return the expected total count of tasks
def total_count
raise NotImplementedError
end

# Returns the items in the current batch
def current_batch
start_position = output[:total_count]
start_position = output[:planned_count]
size = start_position + batch_size > total_count ? total_count - start_position : batch_size
batch(start_position, size)
end
Expand All @@ -36,12 +47,6 @@ def batch_size
BATCH_SIZE
end

def done?
# The action is done if the real total count equal to the expected total count and all of them
# are either successful or failed
super && total_count == output[:total_count]
end

# The same logic as in Action::WithSubPlans, but calculated using the expected total count
def run_progress
if counts_set?
Expand All @@ -58,8 +63,8 @@ def spawn_plans
end

def cancel!
output[:failed_count] += total_count - output[:total_count]
output[:total_count] = total_count
# Count the not-yet-planned tasks as failed
output[:failed_count] += total_count - output[:planned_count]
if uses_concurrency_control
# Tell the throttle limiter to cancel the tasks its managing
world.throttle_limiter.cancel!(execution_plan_id)
Expand All @@ -76,7 +81,7 @@ def cancel!
private

def can_spawn_next_batch?
total_count > output[:total_count]
total_count - output[:success_count] - output[:pending_count] - output[:failed_count] > 0
end

end
Expand Down
19 changes: 10 additions & 9 deletions lib/dynflow/action/with_sub_plans.rb
Expand Up @@ -26,10 +26,6 @@ def run(event = nil)
end

def initiate
output.update(:total_count => 0,
:success_count => 0,
:failed_count => 0,
:pending_count => 0)
if uses_concurrency_control
calculate_time_distribution
world.throttle_limiter.initialize_plan(execution_plan_id, input[:concurrency_control])
Expand Down Expand Up @@ -112,18 +108,21 @@ def distribute_over_time(time_span, count)

def wait_for_sub_plans(sub_plans)
planned, failed = sub_plans.partition(&:planned?)

output[:total_count] += sub_plans.size
output[:failed_count] += failed.size
output[:pending_count] += planned.size

increase_counts(planned.count, failed.count)
if planned.any?
notify_on_finish(planned)
else
check_for_errors!
end
end

def increase_counts(planned, failed, track_total = true)
output[:total_count] = output.fetch(:total_count, 0) + planned + failed if track_total
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:pending_count] = output.fetch(:pending_count, 0) + planned
output[:success_count] ||= 0
end

def try_to_finish
if done?
world.throttle_limiter.finish(execution_plan_id)
Expand All @@ -137,6 +136,8 @@ def try_to_finish

def resume
if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
# We're starting over and need to reset the counts
%w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
initiate
else
recalculate_counts
Expand Down

0 comments on commit f68280d

Please sign in to comment.