Skip to content

Commit

Permalink
Merge e5b9ce0 into 3cd57bd
Browse files Browse the repository at this point in the history
  • Loading branch information
serch committed Mar 1, 2017
2 parents 3cd57bd + e5b9ce0 commit ba7cd7e
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 86 deletions.
5 changes: 2 additions & 3 deletions lib/rworkflow/lifecycle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ class Lifecycle

DEFAULT_STATE_OPTIONS = {
cardinality: State::DEFAULT_CARDINALITY,
priority: State::DEFAULT_PRIORITY,
policy: State::STATE_POLICY_NO_WAIT
}
}.freeze

def initialize(state_class: State, state_options: {})
@state_options = DEFAULT_STATE_OPTIONS.merge(state_options)
Expand All @@ -30,7 +29,7 @@ def state(name, options = {})

def transition(from, name)
from_state = @states[from]
fail(StateError, from) if from_state.nil?
raise(StateError, from) if from_state.nil?

return from_state.perform(name, @default)
end
Expand Down
27 changes: 7 additions & 20 deletions lib/rworkflow/sidekiq_flow.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
module Rworkflow
class SidekiqFlow < Flow

STATE_POLICY_GATED = :gated
MAX_EXPECTED_DURATION = 4.hours
PRIORITIES = [:critical, :high, nil, :low]

def initialize(id)
super(id)
Expand All @@ -12,7 +10,7 @@ def initialize(id)

def cleanup
super()
@open_gates.delete()
@open_gates.delete
end

def push(objects, name)
Expand All @@ -32,7 +30,7 @@ def paused?
end

def status
return (paused?) ? 'Paused' : super()
return paused? ? 'Paused' : super()
end

def pause
Expand Down Expand Up @@ -72,21 +70,16 @@ def create_jobs(state_name, num_objects)
if !worker_class.nil?
cardinality = get_state_cardinality(state_name)

if state.policy == State::STATE_POLICY_WAIT
amount = ((num_objects + get_state_list(state_name).size) / cardinality.to_f).floor
amount = if state.policy == State::STATE_POLICY_WAIT
((num_objects + get_state_list(state_name).size) / cardinality.to_f).floor
else
amount = (num_objects / cardinality.to_f).ceil
(num_objects / cardinality.to_f).ceil
end

state_priority = self.priority || state.priority
amount.times { worker_class.enqueue_job_with_priority(state_priority, @id, state_name) }
amount.times { worker_class.enqueue_job(@id, state_name) }
end
end

def priority
return @priority ||= begin self.get(:priority) end
end

def gated?(state_name)
state = @lifecycle.states[state_name]
return state.policy == STATE_POLICY_GATED && !@open_gates.include?(state_name)
Expand All @@ -105,15 +98,9 @@ def close_gate(state_name)
class << self
def create(lifecycle, name = '', options)
workflow = super(lifecycle, name, options)
workflow.set(:priority, options[:priority]) unless options[:priority].nil?

return workflow
end

def get_manual_priority
return :high
end

def cleanup_broken_flows
broken = []
flows = self.all
Expand Down Expand Up @@ -147,7 +134,7 @@ def enqueue_missing_jobs

def build_flow_map
flow_map = {}
queues = SidekiqHelper.get_queue_sizes.keys
queues = SidekiqHelper.queue_sizes.keys
queues.each do |queue_name|
queue = Sidekiq::Queue.new(queue_name)
queue.each do |job|
Expand Down
43 changes: 7 additions & 36 deletions lib/rworkflow/sidekiq_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,14 @@
module Rworkflow
module SidekiqHelper
def self.included(klass)
klass.send :extend, ClassMethods
klass.send :include, InstanceMethods
klass.send :extend, ClassMethods
end

module ClassMethods
# Mix-in methods
def enqueue_job(*params)
enqueue_job_with_priority(nil, *params)
end

def enqueue_job_with_priority(priority, *params)
if should_perform_job_async?
self.perform_with_priority(priority, *params)
else
inline_perform(params)
end
end

def enqueue_job_at(at_time, *params)
if should_perform_job_async?
self.perform_at(at_time, *params)
else
inline_perform(params)
end
end

def enqueue_job_in(time_diff, *params)
if should_perform_job_async?
self.perform_in(time_diff, *params)
self.perform_async(*params)
else
inline_perform(params)
end
Expand All @@ -50,35 +29,27 @@ def inline_perform(params)
end
end

module InstanceMethods
end

# Static methods
class << self
def configure_server host, port, db
def configure_server(host, port, db)
Sidekiq.configure_server do |config|
config.redis = {:url => "redis://#{host}:#{port}/#{db}", :namespace => 'sidekiq'}
config.redis = { url: "redis://#{host}:#{port}/#{db}", namespace: 'sidekiq' }
config.server_middleware do |chain|
chain.add SidekiqServerMiddleware
end
end
end

def configure_client host, port, db
def configure_client(host, port, db)
Sidekiq.configure_client do |config|
config.redis = {:url => "redis://#{host}:#{port}/#{db}", :namespace => 'sidekiq'}
config.redis = { url: "redis://#{host}:#{port}/#{db}", namespace: 'sidekiq' }
end
end

def get_queue_sizes
def queue_sizes
stats = Sidekiq::Stats.new
return stats.queues
end

def get_queue_sizes_sum
stats = Sidekiq::Stats.new
return stats.enqueued
end
end
end
end
22 changes: 7 additions & 15 deletions lib/rworkflow/state.rb
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
module Rworkflow
class State
DEFAULT_CARDINALITY = 1
DEFAULT_PRIORITY = nil

# To be refactored into Policy objects
STATE_POLICY_WAIT = :wait
STATE_POLICY_NO_WAIT = :no_wait

attr_accessor :cardinality, :priority, :policy
attr_accessor :cardinality, :policy
attr_reader :transitions

def initialize(cardinality: DEFAULT_CARDINALITY, priority: DEFAULT_PRIORITY, policy: STATE_POLICY_NO_WAIT, **_)
def initialize(cardinality: DEFAULT_CARDINALITY, policy: STATE_POLICY_NO_WAIT, **_)
@cardinality = cardinality
@priority = priority
@policy = policy

@transitions = {}
end

Expand All @@ -24,14 +21,13 @@ def transition(name, to)

def perform(name, default = nil)
to_state = @transitions[name] || default
raise TransitionError.new(name) if to_state.nil?
raise(TransitionError, name) if to_state.nil?
return to_state
end

# Default rule: new state overwrites old state when applicable
def merge!(state)
@cardinality = state.cardinality
@priority = state.priority
@policy = state.policy

@transitions.merge!(state.transitions) do |_, _, transition|
Expand All @@ -46,23 +42,19 @@ def merge(state)
end

def clone
cloned = self.class.new(cardinality: @cardinality, priority: @priority, policy: @policy)
cloned = self.class.new(cardinality: @cardinality, policy: @policy)
@transitions.each { |from, to| cloned.transition(from, to) }
return cloned
end

def ==(state)
return @cardinality == state.cardinality &&
@priority == state.priority &&
@policy == state.policy &&
@transitions == state.transitions
def ==(other)
return @cardinality == other.cardinality && @policy == other.policy && @transitions == other.transitions
end

def to_h
return {
transitions: @transitions,
cardinality: @cardinality,
priority: @priority,
policy: @policy
}
end
Expand All @@ -78,7 +70,7 @@ def to_graph
end

def inspect
return "[ Cardinality: #{@cardinality} ; Policy: #{@policy} ; Priority: #{@priority} ] -> #{to_graph.to_s}"
return "[ Cardinality: #{@cardinality} ; Policy: #{@policy} ] -> #{to_graph}"
end

def serialize
Expand Down
2 changes: 0 additions & 2 deletions lib/rworkflow/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ class Worker
include Sidekiq::Worker
include SidekiqHelper

sidekiq_options queue: :mysql

def perform(id, state_name)
@workflow = self.class.load_workflow(id)
@state_name = state_name
Expand Down
2 changes: 1 addition & 1 deletion test/flow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_flow_state_policy_wait
state.transition :pushed, "WaitState"
end

lc.state("WaitState", {cardinality: initial_objects.size, priority: State::DEFAULT_PRIORITY, policy: State::STATE_POLICY_WAIT}) do |state|
lc.state("WaitState", {cardinality: initial_objects.size, policy: State::STATE_POLICY_WAIT}) do |state|
state.transition :collected, Flow::STATE_SUCCESSFUL
end

Expand Down
4 changes: 2 additions & 2 deletions test/lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def test_concat
end

class LCFactory
def self.simple_lifecycle(state_name, transition, cardinality = 1, priority = nil)
def self.simple_lifecycle(state_name, transition, cardinality = 1)
return Rworkflow::Lifecycle.new do |cycle|
cycle.state(state_name, cardinality: cardinality, priority: priority) do |state|
cycle.state(state_name, cardinality: cardinality) do |state|
state.transition transition, Rworkflow::SidekiqFlow::STATE_SUCCESSFUL
state.transition :failed, Rworkflow::SidekiqFlow::STATE_FAILED
end
Expand Down
2 changes: 1 addition & 1 deletion test/sidekiq_flow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_collector_state_workflow
state.transition :sent, 'Rworkflow::SidekiqFlowTest::PostcardCollector'
end

lc.state("Rworkflow::SidekiqFlowTest::PostcardCollector", {cardinality: Lifecycle::CARDINALITY_ALL_STARTED, priority: State::DEFAULT_PRIORITY, policy: State::STATE_POLICY_WAIT}) do |state|
lc.state("Rworkflow::SidekiqFlowTest::PostcardCollector", {cardinality: Lifecycle::CARDINALITY_ALL_STARTED, policy: State::STATE_POLICY_WAIT}) do |state|
state.transition :received, Rworkflow::Flow::STATE_SUCCESSFUL
end

Expand Down
7 changes: 1 addition & 6 deletions test/state_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ def test_equality
other_state.policy = State::STATE_POLICY_WAIT
assert_not_equal @state, other_state, 'State A != B: different policies!'

other_state = State.new
other_state.priority = :high
assert_not_equal @state, other_state, 'State A != B: different priorities!'

other_state = State.new
other_state.cardinality = 32
assert_not_equal @state, other_state, 'State A != B: different cardinalities!'
Expand Down Expand Up @@ -74,14 +70,13 @@ def test_clone
@state.transition('a', 'b')
@state.policy = State::STATE_POLICY_WAIT
@state.cardinality = 2
@state.priority = :high
cloned = @state.clone
assert_equal @state, cloned, 'Original and cloned states should be equal'
assert !@state.equal?(cloned), 'Original and cloned states should not be the same object'
end

def test_merge
other_state = State.new(cardinality: 2, priority: :high, policy: State::STATE_POLICY_WAIT)
other_state = State.new(cardinality: 2, policy: State::STATE_POLICY_WAIT)
merged = @state.merge(other_state)
assert_equal merged, other_state, 'Merged state should be equal to state B'

Expand Down

0 comments on commit ba7cd7e

Please sign in to comment.