From deab24ee22ba4b892a72315e892e5efcbfc60d3e Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 18 Feb 2022 14:50:11 +0000 Subject: [PATCH] fixup! Implement a versioned workflow concern --- .../integration/versioned_workflow_spec.rb | 4 +- lib/cadence/concerns/versioned.rb | 35 ++- lib/cadence/execution_options.rb | 2 +- lib/cadence/worker.rb | 8 +- .../lib/cadence/concerns/versioned_spec.rb | 237 ++++++++++++++++++ .../lib/cadence/execution_options_spec.rb | 20 +- 6 files changed, 282 insertions(+), 24 deletions(-) create mode 100644 spec/unit/lib/cadence/concerns/versioned_spec.rb diff --git a/examples/spec/integration/versioned_workflow_spec.rb b/examples/spec/integration/versioned_workflow_spec.rb index f5a8a181..b4567853 100644 --- a/examples/spec/integration/versioned_workflow_spec.rb +++ b/examples/spec/integration/versioned_workflow_spec.rb @@ -15,7 +15,7 @@ end context 'with explicit version' do - let(:options) { { options: { headers: { 'Version' => '1' } } } } + let(:options) { { options: { version: 1 } } } it 'executes the specified version' do result = run_workflow(described_class, options) @@ -28,7 +28,7 @@ end context 'with a non-existing version' do - let(:options) { { options: { headers: { 'Version' => '3' } } } } + let(:options) { { options: { version: 3 } } } it 'raises an error' do expect do diff --git a/lib/cadence/concerns/versioned.rb b/lib/cadence/concerns/versioned.rb index 416c6894..8c4391c7 100644 --- a/lib/cadence/concerns/versioned.rb +++ b/lib/cadence/concerns/versioned.rb @@ -13,8 +13,10 @@ def self.included(base) class UnknownWorkflowVersion < Cadence::ClientError; end class Workflow - def initialize(main_class, headers) - version = headers.fetch(VERSION_HEADER_NAME, main_class.latest_version).to_i + attr_reader :version, :main_class, :version_class + + def initialize(main_class, version = nil) + version ||= main_class.pick_version version_class = main_class.version_class_for(version) @version = version @@ -23,11 +25,21 @@ def initialize(main_class, headers) end def domain - version_class.domain || main_class.domain + if version_class.domain + warn '[WARNING] Overriding domain in a workflow version is not yet supported. ' \ + "Called from #{version_class}." + end + + main_class.domain end def task_list - version_class.task_list || main_class.task_list + if version_class.task_list + warn '[WARNING] Overriding task_list in a workflow version is not yet supported. ' \ + "Called from #{version_class}." + end + + main_class.task_list end def retry_policy @@ -41,10 +53,6 @@ def timeouts def headers (version_class.headers || main_class.headers || {}).merge(VERSION_HEADER_NAME => version.to_s) end - - private - - attr_reader :version, :main_class, :version_class end module ClassMethods @@ -70,12 +78,19 @@ def version_class_for(version) end end - def latest_version - versions.keys.max + def pick_version + version_picker.call(versions.keys.max) end private + DEFAULT_VERSION_PICKER = lambda { |latest_version| latest_version } + + def version_picker(&block) + return @version_picker || DEFAULT_VERSION_PICKER unless block_given? + @version_picker = block + end + def versions # Initialize with the default version @versions ||= { DEFAULT_VERSION => self } diff --git a/lib/cadence/execution_options.rb b/lib/cadence/execution_options.rb index 6f8c5796..82a6cb9d 100644 --- a/lib/cadence/execution_options.rb +++ b/lib/cadence/execution_options.rb @@ -19,7 +19,7 @@ def initialize(object, options, defaults = nil) # For Cadence::Workflow and Cadence::Activity use defined values as the next option if object.singleton_class.included_modules.include?(Concerns::Executable) # In a versioned workflow merge the specific version options with default workflow options - object = Concerns::Versioned::Workflow.new(object, @headers) if versioned?(object) + object = Concerns::Versioned::Workflow.new(object, options[:version]) if versioned?(object) @domain ||= object.domain @task_list ||= object.task_list diff --git a/lib/cadence/worker.rb b/lib/cadence/worker.rb index a3632950..5b1cb23d 100644 --- a/lib/cadence/worker.rb +++ b/lib/cadence/worker.rb @@ -3,6 +3,7 @@ require 'cadence/execution_options' require 'cadence/executable_lookup' require 'cadence/middleware/entry' +require 'cadence/concerns/versioned' module Cadence class Worker @@ -18,7 +19,12 @@ def initialize(config = Cadence.configuration, **options) end def register_workflow(workflow_class, options = {}) - execution_options = ExecutionOptions.new(workflow_class, options, config.default_execution_options) + execution_options = ExecutionOptions.new( + workflow_class, + # ensure default version to avoid executing version picker in worker context + options.merge(version: Cadence::Concerns::Versioned::DEFAULT_VERSION), + config.default_execution_options + ) key = [execution_options.domain, execution_options.task_list] @workflows[key].add(execution_options.name, workflow_class) diff --git a/spec/unit/lib/cadence/concerns/versioned_spec.rb b/spec/unit/lib/cadence/concerns/versioned_spec.rb new file mode 100644 index 00000000..c8e2fd7c --- /dev/null +++ b/spec/unit/lib/cadence/concerns/versioned_spec.rb @@ -0,0 +1,237 @@ +require 'cadence/concerns/versioned' +require 'cadence/workflow/context' + +describe Cadence::Concerns::Versioned do + class TestVersionedWorkflowV1 < Cadence::Workflow + end + + class TestVersionedWorkflowV2 < Cadence::Workflow + domain 'new-domain' + task_list 'new-task-list' + retry_policy interval: 5, backoff: 1, max_attempts: 2 + timeouts execution: 1 + headers 'HeaderV2' => 'TestV2' + end + + class TestVersionedWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + domain 'domain' + task_list 'task-list' + retry_policy interval: 1, backoff: 2, max_attempts: 5 + timeouts start_to_close: 10 + headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' + + version 1, TestVersionedWorkflowV1 + version 2, TestVersionedWorkflowV2 + end + + describe described_class::Workflow do + subject { described_class.new(TestVersionedWorkflow, version) } + + describe '#initialize' do + before { allow(TestVersionedWorkflow).to receive(:pick_version).and_call_original } + + context 'when passed no version header' do + let(:version) { nil } + + it 'initializes the latest version' do + expect(subject.version).to eq(2) + expect(subject.main_class).to eq(TestVersionedWorkflow) + expect(subject.version_class).to eq(TestVersionedWorkflowV2) + end + + it 'calls version picker' do + subject + + expect(TestVersionedWorkflow).to have_received(:pick_version) + end + end + + context 'when passed a specific version header' do + let(:version) { 1 } + + it 'initializes the specified version' do + expect(subject.version).to eq(1) + expect(subject.main_class).to eq(TestVersionedWorkflow) + expect(subject.version_class).to eq(TestVersionedWorkflowV1) + expect(TestVersionedWorkflow).not_to have_received(:pick_version) + end + end + + context 'when passed a non-existing version' do + let(:version) { 3 } + + it 'raises UnknownWorkflowVersion' do + expect { subject }.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for TestVersionedWorkflow' + ) + expect(TestVersionedWorkflow).not_to have_received(:pick_version) + end + end + end + + context 'when version does not override the attributes' do + let(:version) { 1 } + + before { allow(subject).to receive(:warn) } + + describe '#domain' do + it 'returns default version domain' do + expect(subject.domain).to eq('domain') + expect(subject).not_to have_received(:warn) + end + end + + describe '#task_list' do + it 'returns default version task_list' do + expect(subject.task_list).to eq('task-list') + expect(subject).not_to have_received(:warn) + end + end + + describe '#retry_policy' do + it 'returns default version retry_policy' do + expect(subject.retry_policy).to eq(interval: 1, backoff: 2, max_attempts: 5) + end + end + + describe '#timeouts' do + it 'returns default version timeouts' do + expect(subject.timeouts).to eq(start_to_close: 10) + end + end + + describe '#headers' do + it 'returns default version headers including version header' do + expect(subject.headers).to eq( + 'HeaderA' => 'TestA', + 'HeaderB' => 'TestB', + Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '1' + ) + end + end + end + + context 'when version overwrites the attribute' do + let(:version) { 2 } + + before { allow(subject).to receive(:warn) } + + describe '#domain' do + it 'returns default version domain and warns' do + expect(subject.domain).to eq('domain') + expect(subject) + .to have_received(:warn) + .with('[WARNING] Overriding domain in a workflow version is not yet supported. ' \ + 'Called from TestVersionedWorkflowV2.') + end + end + + describe '#task_list' do + it 'returns default version task_list and warns' do + expect(subject.task_list).to eq('task-list') + expect(subject) + .to have_received(:warn) + .with('[WARNING] Overriding task_list in a workflow version is not yet supported. ' \ + 'Called from TestVersionedWorkflowV2.') + end + end + + describe '#retry_policy' do + it 'returns overriden retry_policy' do + expect(subject.retry_policy).to eq(interval: 5, backoff: 1, max_attempts: 2) + end + end + + describe '#timeouts' do + it 'returns overriden timeouts' do + expect(subject.timeouts).to eq(execution: 1) + end + end + + describe '#headers' do + it 'returns overriden headers including version header' do + expect(subject.headers).to eq( + 'HeaderV2' => 'TestV2', + Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2' + ) + end + end + end + end + + describe described_class::ClassMethods do + subject { TestVersionedWorkflow } + + describe '.version' do + after { TestVersionedWorkflow.send(:versions).delete(4) } + + it 'adds a new version' do + subject.version(4, TestVersionedWorkflowV1) + + expect(subject.version_class_for(4)).to eq(TestVersionedWorkflowV1) + end + end + + describe '.execute_in_context' do + let(:context) { instance_double(Cadence::Workflow::Context, headers: headers) } + + context 'when called with a non-default version' do + let(:headers) { { Cadence::Concerns::Versioned::VERSION_HEADER_NAME => '2' } } + before { allow(TestVersionedWorkflowV2).to receive(:execute_in_context) } + + it 'calls version' do + subject.execute_in_context(context, nil) + + expect(TestVersionedWorkflowV2).to have_received(:execute_in_context).with(context, nil) + end + end + end + + describe '.version_class_for' do + context 'when given a valid version' do + it 'returns version class' do + expect(subject.version_class_for(2)).to eq(TestVersionedWorkflowV2) + end + end + + context 'when given a default version' do + it 'returns default version class' do + expect(subject.version_class_for(Cadence::Concerns::Versioned::DEFAULT_VERSION)) + .to eq(TestVersionedWorkflow) + end + end + + context 'when given an invalid version' do + it 'raises UnknownWorkflowVersion' do + expect { subject.version_class_for(3) }.to raise_error( + Cadence::Concerns::Versioned::UnknownWorkflowVersion, + 'Unknown version 3 for TestVersionedWorkflow' + ) + end + end + end + + describe '.pick_version' do + class TestPickVersionWorkflow < Cadence::Workflow + include Cadence::Concerns::Versioned + + version_picker { |latest_version| latest_version + 42 } + end + + context 'when using default version picker' do + it 'returns the latest version' do + expect(TestVersionedWorkflow.pick_version).to eq(2) + end + end + + context 'when using overriden version picker' do + it 'returns a custom version' do + expect(TestPickVersionWorkflow.pick_version).to eq(42) + end + end + end + end +end diff --git a/spec/unit/lib/cadence/execution_options_spec.rb b/spec/unit/lib/cadence/execution_options_spec.rb index 46bc4caf..81accfc7 100644 --- a/spec/unit/lib/cadence/execution_options_spec.rb +++ b/spec/unit/lib/cadence/execution_options_spec.rb @@ -199,18 +199,18 @@ class TestWorkflow < Cadence::Workflow end context 'when initialized with a Versioned workflow' do - class TestVersionedWorkflowV1 < Cadence::Workflow + class TestVersionedExecutionOptionsWorkflowV1 < Cadence::Workflow retry_policy interval: 5, backoff: 1, max_attempts: 2 timeouts execution: 1 end - class TestVersionedWorkflowV2 < Cadence::Workflow + class TestVersionedExecutionOptionsWorkflowV2 < Cadence::Workflow domain 'new-domain' task_list 'new-task-list' headers 'HeaderV2' => 'TestV2' end - class TestVersionedWorkflow < Cadence::Workflow + class TestVersionedExecutionOptionsWorkflow < Cadence::Workflow include Cadence::Concerns::Versioned domain 'domain' @@ -219,18 +219,18 @@ class TestVersionedWorkflow < Cadence::Workflow timeouts start_to_close: 10 headers 'HeaderA' => 'TestA', 'HeaderB' => 'TestB' - version 1, TestVersionedWorkflowV1 - version 2, TestVersionedWorkflowV2 + version 1, TestVersionedExecutionOptionsWorkflowV1 + version 2, TestVersionedExecutionOptionsWorkflowV2 end - let(:object) { TestVersionedWorkflow } + let(:object) { TestVersionedExecutionOptionsWorkflow } let(:options) { {} } context 'when initialized without the version header' do it 'is initialized with a mix of latest version and default version values' do expect(subject.name).to eq(object.name) - expect(subject.domain).to eq('new-domain') - expect(subject.task_list).to eq('new-task-list') + expect(subject.domain).to eq('domain') + expect(subject.task_list).to eq('task-list') expect(subject.retry_policy).to be_an_instance_of(Cadence::RetryPolicy) expect(subject.retry_policy.interval).to eq(1) expect(subject.retry_policy.backoff).to eq(2) @@ -244,7 +244,7 @@ class TestVersionedWorkflow < Cadence::Workflow end context 'when initialized with the version header' do - let(:options) { { headers: { 'Version' => '1' } } } + let(:options) { { version: 1 } } it 'is initialized with a mix of specified version and default version values' do expect(subject.name).to eq(object.name) @@ -264,7 +264,7 @@ class TestVersionedWorkflow < Cadence::Workflow end context 'when initialized with the default version' do - let(:options) { { headers: { 'Version' => '0' } } } + let(:options) { { version: 0 } } it 'is initialized with a default version values' do expect(subject.name).to eq(object.name)