Skip to content

Commit

Permalink
fixes after NP review
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergio Medina committed Mar 2, 2017
1 parent 76edc77 commit e1afa8f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
32 changes: 16 additions & 16 deletions lib/rworkflow/flow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def lifecycle=(new_lifecycle)

def finished?
return false unless started?
total = counters.reduce(0) do |sum, pair|
total = self.counters.reduce(0) do |sum, pair|
self.class.terminal?(pair[0]) ? sum : (sum + pair[1].to_i)
end

Expand Down Expand Up @@ -91,21 +91,21 @@ def count(state)
end

def counters
counters = @storage.get(:counters)
if !counters.nil?
counters = begin
self.class.serializer.load(counters)
the_counters = @storage.get(:counters)
if !the_counters.nil?
the_counters = begin
self.class.serializer.load(the_counters)
rescue => e
Rails.logger.error("Error loading stored flow counters: #{e.message}")
nil
end
end
return counters || counters!
return the_counters || counters!
end

# fetches counters atomically
def counters!
counters = { processing: 0 }
the_counters = { processing: 0 }

names = @lifecycle.states.keys
results = RedisRds::Object.connection.multi do
Expand All @@ -115,12 +115,12 @@ def counters!
end

(self.class::STATES_TERMINAL + names).each do |name|
counters[name] = results.shift.to_i
the_counters[name] = results.shift.to_i
end

counters[:processing] = results.shift.reduce(0) { |sum, pair| sum + pair.last.to_i }
the_counters[:processing] = results.shift.reduce(0) { |sum, pair| sum + pair.last.to_i }

return counters
return the_counters
end
private :counters!

Expand Down Expand Up @@ -183,9 +183,9 @@ def terminate
post_process

if self.public?
counters = counters!
counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done
@storage.setnx(:counters, self.class.serializer.dump(counters))
the_counters = self.counters!
the_counters[:processing] = 0 # Some worker might have increased the processing flag at that time even if there is no more jobs to be done
@storage.setnx(:counters, self.class.serializer.dump(the_counters))
states_cleanup
else
self.cleanup
Expand Down Expand Up @@ -318,7 +318,7 @@ def start(objects)
end

def total_objects_processed(counters = nil)
return (counters || counters).reduce(0) do |sum, pair|
return (counters || self.counters).reduce(0) do |sum, pair|
if self.class.terminal?(pair[0])
sum + pair[1]
else
Expand All @@ -328,11 +328,11 @@ def total_objects_processed(counters = nil)
end

def total_objects(counters = nil)
return (counters || counters).reduce(0) { |sum, pair| sum + pair[1] }
return (counters || self.counters).reduce(0) { |sum, pair| sum + pair[1] }
end

def total_objects_failed(counters = nil)
return (counters || counters).reduce(0) do |sum, pair|
return (counters || self.counters).reduce(0) do |sum, pair|
if self.class.failure?(pair[0])
sum + pair[1]
else
Expand Down
6 changes: 4 additions & 2 deletions lib/rworkflow/lifecycle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ class Lifecycle

CARDINALITY_ALL_STARTED = :all_started # Indicates a cardinality equal to the jobs pushed at the start of the workflow

DEFAULT_CARDINALITY = State::DEFAULT_CARDINALITY
STATE_POLICY_NO_WAIT = State::STATE_POLICY_NO_WAIT
DEFAULT_STATE_OPTIONS = {
cardinality: State::DEFAULT_CARDINALITY,
policy: State::STATE_POLICY_NO_WAIT
cardinality: self::DEFAULT_CARDINALITY,
policy: self::STATE_POLICY_NO_WAIT
}.freeze

def initialize(state_class: State, state_options: {})
Expand Down
2 changes: 1 addition & 1 deletion lib/rworkflow/sidekiq_flow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def pause
def continue
return if self.finished? || !self.valid? || !self.paused?
if @flow_data.decr(:paused) == 0
workers = Hash[counters.select { |name, _| !self.class.terminal?(name) && name != :processing }]
workers = Hash[self.counters.select { |name, _| !self.class.terminal?(name) && name != :processing }]

# enqueue jobs
workers.each { |worker, num_objects| create_jobs(worker, num_objects) }
Expand Down

0 comments on commit e1afa8f

Please sign in to comment.