Skip to content

Commit

Permalink
fixup! Implement a versioned workflow concern
Browse files Browse the repository at this point in the history
  • Loading branch information
antstorm committed Feb 18, 2022
1 parent 59a1f4b commit deab24e
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 24 deletions.
4 changes: 2 additions & 2 deletions examples/spec/integration/versioned_workflow_spec.rb
Expand Up @@ -15,7 +15,7 @@
end

context 'with explicit version' do
let(:options) { { options: { headers: { 'Version' => '1' } } } }
let(:options) { { options: { version: 1 } } }

it 'executes the specified version' do
result = run_workflow(described_class, options)
Expand All @@ -28,7 +28,7 @@
end

context 'with a non-existing version' do
let(:options) { { options: { headers: { 'Version' => '3' } } } }
let(:options) { { options: { version: 3 } } }

it 'raises an error' do
expect do
Expand Down
35 changes: 25 additions & 10 deletions lib/cadence/concerns/versioned.rb
Expand Up @@ -13,8 +13,10 @@ def self.included(base)
class UnknownWorkflowVersion < Cadence::ClientError; end

class Workflow
def initialize(main_class, headers)
version = headers.fetch(VERSION_HEADER_NAME, main_class.latest_version).to_i
attr_reader :version, :main_class, :version_class

def initialize(main_class, version = nil)
version ||= main_class.pick_version
version_class = main_class.version_class_for(version)

@version = version
Expand All @@ -23,11 +25,21 @@ def initialize(main_class, headers)
end

def domain
version_class.domain || main_class.domain
if version_class.domain
warn '[WARNING] Overriding domain in a workflow version is not yet supported. ' \
"Called from #{version_class}."
end

main_class.domain
end

def task_list
version_class.task_list || main_class.task_list
if version_class.task_list
warn '[WARNING] Overriding task_list in a workflow version is not yet supported. ' \
"Called from #{version_class}."
end

main_class.task_list
end

def retry_policy
Expand All @@ -41,10 +53,6 @@ def timeouts
def headers
(version_class.headers || main_class.headers || {}).merge(VERSION_HEADER_NAME => version.to_s)
end

private

attr_reader :version, :main_class, :version_class
end

module ClassMethods
Expand All @@ -70,12 +78,19 @@ def version_class_for(version)
end
end

def latest_version
versions.keys.max
def pick_version
version_picker.call(versions.keys.max)
end

private

DEFAULT_VERSION_PICKER = lambda { |latest_version| latest_version }

def version_picker(&block)
return @version_picker || DEFAULT_VERSION_PICKER unless block_given?
@version_picker = block
end

def versions
# Initialize with the default version
@versions ||= { DEFAULT_VERSION => self }
Expand Down
2 changes: 1 addition & 1 deletion lib/cadence/execution_options.rb
Expand Up @@ -19,7 +19,7 @@ def initialize(object, options, defaults = nil)
# For Cadence::Workflow and Cadence::Activity use defined values as the next option
if object.singleton_class.included_modules.include?(Concerns::Executable)
# In a versioned workflow merge the specific version options with default workflow options
object = Concerns::Versioned::Workflow.new(object, @headers) if versioned?(object)
object = Concerns::Versioned::Workflow.new(object, options[:version]) if versioned?(object)

@domain ||= object.domain
@task_list ||= object.task_list
Expand Down
8 changes: 7 additions & 1 deletion lib/cadence/worker.rb
Expand Up @@ -3,6 +3,7 @@
require 'cadence/execution_options'
require 'cadence/executable_lookup'
require 'cadence/middleware/entry'
require 'cadence/concerns/versioned'

module Cadence
class Worker
Expand All @@ -18,7 +19,12 @@ def initialize(config = Cadence.configuration, **options)
end

def register_workflow(workflow_class, options = {})
execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options)
execution_options = ExecutionOptions.new(
workflow_class,
# ensure default version to avoid executing version picker in worker context
options.merge(version: Cadence::Concerns::Versioned::DEFAULT_VERSION),
config.default_execution_options
)
key = [execution_options.domain, execution_options.task_list]

@workflows[key].add(execution_options.name, workflow_class)
Expand Down
237 changes: 237 additions & 0 deletions spec/unit/lib/cadence/concerns/versioned_spec.rb
@@ -0,0 +1,237 @@
require 'cadence/concerns/versioned'
require 'cadence/workflow/context'

describe Cadence::Concerns::Versioned do
class TestVersionedWorkflowV1 < Cadence::Workflow
end

class TestVersionedWorkflowV2 < Cadence::Workflow
domain 'new-domain'
task_list 'new-task-list'
retry_policy interval: 5, backoff: 1, max_attempts: 2
timeouts execution: 1
headers 'HeaderV2' => 'TestV2'
end

class TestVersionedWorkflow < Cadence::Workflow
include Cadence::Concerns::Versioned

domain 'domain'
task_list 'task-list'
retry_policy interval: 1, backoff: 2, max_attempts: 5
timeouts start_to_close: 10
headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB'

version 1, TestVersionedWorkflowV1
version 2, TestVersionedWorkflowV2
end

describe described_class::Workflow do
subject { described_class.new(TestVersionedWorkflow, version) }

describe '#initialize' do
before { allow(TestVersionedWorkflow).to receive(:pick_version).and_call_original }

context 'when passed no version header' do
let(:version) { nil }

it 'initializes the latest version' do
expect(subject.version).to eq(2)
expect(subject.main_class).to eq(TestVersionedWorkflow)
expect(subject.version_class).to eq(TestVersionedWorkflowV2)
end

it 'calls version picker' do
subject

expect(TestVersionedWorkflow).to have_received(:pick_version)
end
end

context 'when passed a specific version header' do
let(:version) { 1 }

it 'initializes the specified version' do
expect(subject.version).to eq(1)
expect(subject.main_class).to eq(TestVersionedWorkflow)
expect(subject.version_class).to eq(TestVersionedWorkflowV1)
expect(TestVersionedWorkflow).not_to have_received(:pick_version)
end
end

context 'when passed a non-existing version' do
let(:version) { 3 }

it 'raises UnknownWorkflowVersion' do
expect { subject }.to raise_error(
Cadence::Concerns::Versioned::UnknownWorkflowVersion,
'Unknown version 3 for TestVersionedWorkflow'
)
expect(TestVersionedWorkflow).not_to have_received(:pick_version)
end
end
end

context 'when version does not override the attributes' do
let(:version) { 1 }

before { allow(subject).to receive(:warn) }

describe '#domain' do
it 'returns default version domain' do
expect(subject.domain).to eq('domain')
expect(subject).not_to have_received(:warn)
end
end

describe '#task_list' do
it 'returns default version task_list' do
expect(subject.task_list).to eq('task-list')
expect(subject).not_to have_received(:warn)
end
end

describe '#retry_policy' do
it 'returns default version retry_policy' do
expect(subject.retry_policy).to eq(interval: 1, backoff: 2, max_attempts: 5)
end
end

describe '#timeouts' do
it 'returns default version timeouts' do
expect(subject.timeouts).to eq(start_to_close: 10)
end
end

describe '#headers' do
it 'returns default version headers including version header' do
expect(subject.headers).to eq(
'HeaderA' => 'TestA',
'HeaderB' => 'TestB',
Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '1'
)
end
end
end

context 'when version overwrites the attribute' do
let(:version) { 2 }

before { allow(subject).to receive(:warn) }

describe '#domain' do
it 'returns default version domain and warns' do
expect(subject.domain).to eq('domain')
expect(subject)
.to have_received(:warn)
.with('[WARNING] Overriding domain in a workflow version is not yet supported. ' \
'Called from TestVersionedWorkflowV2.')
end
end

describe '#task_list' do
it 'returns default version task_list and warns' do
expect(subject.task_list).to eq('task-list')
expect(subject)
.to have_received(:warn)
.with('[WARNING] Overriding task_list in a workflow version is not yet supported. ' \
'Called from TestVersionedWorkflowV2.')
end
end

describe '#retry_policy' do
it 'returns overriden retry_policy' do
expect(subject.retry_policy).to eq(interval: 5, backoff: 1, max_attempts: 2)
end
end

describe '#timeouts' do
it 'returns overriden timeouts' do
expect(subject.timeouts).to eq(execution: 1)
end
end

describe '#headers' do
it 'returns overriden headers including version header' do
expect(subject.headers).to eq(
'HeaderV2' => 'TestV2',
Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2'
)
end
end
end
end

describe described_class::ClassMethods do
subject { TestVersionedWorkflow }

describe '.version' do
after { TestVersionedWorkflow.send(:versions).delete(4) }

it 'adds a new version' do
subject.version(4, TestVersionedWorkflowV1)

expect(subject.version_class_for(4)).to eq(TestVersionedWorkflowV1)
end
end

describe '.execute_in_context' do
let(:context) { instance_double(Cadence::Workflow::Context, headers: headers) }

context 'when called with a non-default version' do
let(:headers) { { Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2' } }
before { allow(TestVersionedWorkflowV2).to receive(:execute_in_context) }

it 'calls version' do
subject.execute_in_context(context, nil)

expect(TestVersionedWorkflowV2).to have_received(:execute_in_context).with(context, nil)
end
end
end

describe '.version_class_for' do
context 'when given a valid version' do
it 'returns version class' do
expect(subject.version_class_for(2)).to eq(TestVersionedWorkflowV2)
end
end

context 'when given a default version' do
it 'returns default version class' do
expect(subject.version_class_for(Cadence::Concerns::Versioned::DEFAULT_VERSION))
.to eq(TestVersionedWorkflow)
end
end

context 'when given an invalid version' do
it 'raises UnknownWorkflowVersion' do
expect { subject.version_class_for(3) }.to raise_error(
Cadence::Concerns::Versioned::UnknownWorkflowVersion,
'Unknown version 3 for TestVersionedWorkflow'
)
end
end
end

describe '.pick_version' do
class TestPickVersionWorkflow < Cadence::Workflow
include Cadence::Concerns::Versioned

version_picker { |latest_version| latest_version + 42 }
end

context 'when using default version picker' do
it 'returns the latest version' do
expect(TestVersionedWorkflow.pick_version).to eq(2)
end
end

context 'when using overriden version picker' do
it 'returns a custom version' do
expect(TestPickVersionWorkflow.pick_version).to eq(42)
end
end
end
end
end

0 comments on commit deab24e

Please sign in to comment.