From 3feb258b98e322301c866542b845663a309d3f0a Mon Sep 17 00:00:00 2001 From: antstorm Date: Tue, 1 Mar 2022 13:51:54 +0000 Subject: [PATCH 01/16] Remove global config reference from Temporal::Client --- lib/temporal/client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 5e622743..ac0b3d51 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -281,7 +281,7 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_ # for reference # @param details [String, Array, nil] optional details to be stored in history def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil) - namespace ||= Temporal.configuration.namespace + namespace ||= config.namespace connection.terminate_workflow_execution( namespace: namespace, From f379a5d7d4c0a3b0a8737204a8e760d82a40b1f8 Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 3 Mar 2022 19:02:49 +0000 Subject: [PATCH 02/16] Implement ConverterWrapper class --- lib/temporal/configuration.rb | 9 +- lib/temporal/converter_wrapper.rb | 63 +++++++++++ spec/fabricators/grpc/payload_fabricator.rb | 22 ++++ spec/fabricators/grpc/payloads_fabricator.rb | 9 ++ spec/unit/lib/temporal/configuration_spec.rb | 28 ++++- .../lib/temporal/converter_wrapper_spec.rb | 105 ++++++++++++++++++ 6 files changed, 233 insertions(+), 3 deletions(-) create mode 100644 lib/temporal/converter_wrapper.rb create mode 100644 spec/fabricators/grpc/payloads_fabricator.rb create mode 100644 spec/unit/lib/temporal/converter_wrapper_spec.rb diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index 8d36615c..f85b6770 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -4,6 +4,7 @@ require 'temporal/connection/converter/payload/bytes' require 'temporal/connection/converter/payload/json' require 'temporal/connection/converter/composite' +require 'temporal/converter_wrapper' module Temporal class Configuration @@ -11,7 +12,6 @@ class Configuration Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true) attr_reader :timeouts, :error_handlers - attr_writer :converter attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers # See https://docs.temporal.io/blog/activity-timeouts/ for general docs. @@ -72,7 +72,12 @@ def timeouts=(new_timeouts) end def converter - @converter + @converter_wrapper ||= ConverterWrapper.new(@converter) + end + + def converter=(new_converter) + @converter = new_converter + @converter_wrapper = nil end def for_connection diff --git a/lib/temporal/converter_wrapper.rb b/lib/temporal/converter_wrapper.rb new file mode 100644 index 00000000..caaf2c31 --- /dev/null +++ b/lib/temporal/converter_wrapper.rb @@ -0,0 +1,63 @@ +# This class provides convenience methods for accessing the converter. it is backwards compatible +# with Temporal::Connection::Converter::Base interface, however it adds new methods specific to +# different conversion scenarios. + +module Temporal + class ConverterWrapper + def initialize(converter) + @converter = converter + end + + def from_payloads(payloads) + converter.from_payloads(payloads) + end + + def from_payload(payload) + converter.from_payload(payload) + end + + def from_result_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_details_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_signal_payloads(payloads) + from_payloads(payloads)&.first + end + + def from_payload_map(payload_map) + payload_map.map { |key, value| [key, from_payload(value)] }.to_h + end + + def to_payloads(data) + converter.to_payloads(data) + end + + def to_payload(data) + converter.to_payload(data) + end + + def to_result_payloads(data) + to_payloads([data]) + end + + def to_details_payloads(data) + to_payloads([data]) + end + + def to_signal_payloads(data) + to_payloads([data]) + end + + def to_payload_map(data) + data.transform_values(&method(:to_payload)) + end + + private + + attr_reader :converter + end +end diff --git a/spec/fabricators/grpc/payload_fabricator.rb b/spec/fabricators/grpc/payload_fabricator.rb index d6476915..c797f4db 100644 --- a/spec/fabricators/grpc/payload_fabricator.rb +++ b/spec/fabricators/grpc/payload_fabricator.rb @@ -1,3 +1,25 @@ +require 'temporal/connection/converter/payload/nil' + Fabricator(:api_payload, from: Temporal::Api::Common::V1::Payload) do metadata { Google::Protobuf::Map.new(:string, :bytes) } end + +Fabricator(:api_payload_nil, from: :api_payload) do + metadata do + Google::Protobuf::Map.new(:string, :bytes).tap do |m| + m['encoding'] = Temporal::Connection::Converter::Payload::Nil::ENCODING + end + end +end + +Fabricator(:api_payload_bytes, from: :api_payload) do + transient :bytes + + metadata do + Google::Protobuf::Map.new(:string, :bytes).tap do |m| + m['encoding'] = Temporal::Connection::Converter::Payload::Bytes::ENCODING + end + end + + data { |attrs| attrs.fetch(:bytes, 'foobar') } +end diff --git a/spec/fabricators/grpc/payloads_fabricator.rb b/spec/fabricators/grpc/payloads_fabricator.rb new file mode 100644 index 00000000..d4c95514 --- /dev/null +++ b/spec/fabricators/grpc/payloads_fabricator.rb @@ -0,0 +1,9 @@ +Fabricator(:api_payloads, from: Temporal::Api::Common::V1::Payloads) do + transient :payloads_array + + payloads do |attrs| + Google::Protobuf::RepeatedField.new(:message, Temporal::Api::Common::V1::Payload).tap do |m| + m.concat(Array(attrs.fetch(:payloads_array, Fabricate(:api_payload)))) + end + end +end diff --git a/spec/unit/lib/temporal/configuration_spec.rb b/spec/unit/lib/temporal/configuration_spec.rb index 7e083429..175e1387 100644 --- a/spec/unit/lib/temporal/configuration_spec.rb +++ b/spec/unit/lib/temporal/configuration_spec.rb @@ -26,4 +26,30 @@ expect(timeouts[:heartbeat]).to be(nil) end end -end \ No newline at end of file + + describe '#converter' do + it 'wraps the provided converter' do + converter_wrapper = subject.converter + + expect(converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(converter_wrapper.send(:converter)).to eq(described_class::DEFAULT_CONVERTER) + end + end + + describe '#converter=' do + let(:converter) { instance_double(Temporal::Connection::Converter::Composite) } + + it 'resets the wrapper when converter has changed' do + old_converter_wrapper = subject.converter + + expect(old_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(old_converter_wrapper.send(:converter)).to eq(described_class::DEFAULT_CONVERTER) + + subject.converter = converter + new_converter_wrapper = subject.converter + + expect(new_converter_wrapper).to be_a(Temporal::ConverterWrapper) + expect(new_converter_wrapper.send(:converter)).to eq(converter) + end + end +end diff --git a/spec/unit/lib/temporal/converter_wrapper_spec.rb b/spec/unit/lib/temporal/converter_wrapper_spec.rb new file mode 100644 index 00000000..24cdf55f --- /dev/null +++ b/spec/unit/lib/temporal/converter_wrapper_spec.rb @@ -0,0 +1,105 @@ +require 'temporal/converter_wrapper' +require 'temporal/connection/converter/payload/bytes' +require 'temporal/connection/converter/payload/nil' +require 'temporal/connection/converter/composite' + +describe Temporal::ConverterWrapper do + subject { described_class.new(converter) } + let(:converter) do + Temporal::Connection::Converter::Composite.new(payload_converters: [ + Temporal::Connection::Converter::Payload::Bytes.new, + Temporal::Connection::Converter::Payload::Nil.new + ]) + end + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes, payload_nil]) } + let(:payload_bytes) { Fabricate(:api_payload_bytes, bytes: 'test-payload') } + let(:payload_nil) { Fabricate(:api_payload_nil) } + + describe '#from_payloads' do + it 'converts' do + expect(subject.from_payloads(payloads)).to eq(['test-payload', nil]) + end + end + + describe '#from_payload' do + it 'converts' do + expect(subject.from_payload(payload_bytes)).to eq('test-payload') + end + end + + describe '#from_result_payloads' do + it 'converts' do + expect(subject.from_result_payloads(payloads)).to eq('test-payload') + end + end + + describe '#from_details_payloads' do + it 'converts first payload' do + expect(subject.from_details_payloads(payloads)).to eq('test-payload') + end + end + + describe '#from_signal_payloads' do + it 'converts first payload' do + expect(subject.from_signal_payloads(payloads)).to eq('test-payload') + end + end + + describe '#from_payload_map' do + let(:payload_map) do + Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload).tap do |m| + m['first'] = payload_bytes + m['second'] = payload_nil + end + end + + it 'converts first payload' do + expect(subject.from_payload_map(payload_map)) + .to eq('first' => 'test-payload', 'second' => nil) + end + end + + describe '#to_payloads' do + it 'converts' do + expect(subject.to_payloads(['test-payload'.b, nil])).to eq(payloads) + end + end + + describe '#to_payload' do + it 'converts' do + expect(subject.to_payload('test-payload'.b)).to eq(payload_bytes) + end + end + + describe '#to_result_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts' do + expect(subject.to_result_payloads('test-payload'.b)).to eq(payloads) + end + end + + describe '#to_details_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts' do + expect(subject.to_details_payloads('test-payload'.b)).to eq(payloads) + end + end + + describe '#to_signal_payloads' do + let(:payloads) { Fabricate(:api_payloads, payloads_array: [payload_bytes]) } + + it 'converts' do + expect(subject.to_signal_payloads('test-payload'.b)).to eq(payloads) + end + end + + describe '#to_payload_map' do + let(:payload_map) { { first: payload_bytes, second: payload_nil } } + + it 'converts' do + expect(subject.to_payload_map(first: 'test-payload'.b, second: nil)).to eq(payload_map) + end + end +end From 5a18942246254e08f508a777e5f55649543b4a5f Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 3 Mar 2022 19:27:18 +0000 Subject: [PATCH 03/16] Use ConverterWrapper in GRPC --- lib/temporal/configuration.rb | 5 +-- lib/temporal/connection.rb | 2 +- lib/temporal/connection/grpc.rb | 38 ++++++++++------------ spec/unit/lib/temporal/connection_spec.rb | 18 ++++++++++ spec/unit/lib/temporal/grpc_client_spec.rb | 3 +- 5 files changed, 42 insertions(+), 24 deletions(-) create mode 100644 spec/unit/lib/temporal/connection_spec.rb diff --git a/lib/temporal/configuration.rb b/lib/temporal/configuration.rb index f85b6770..7d69f0eb 100644 --- a/lib/temporal/configuration.rb +++ b/lib/temporal/configuration.rb @@ -8,7 +8,7 @@ module Temporal class Configuration - Connection = Struct.new(:type, :host, :port, keyword_init: true) + Connection = Struct.new(:type, :host, :port, :converter, keyword_init: true) Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true) attr_reader :timeouts, :error_handlers @@ -84,7 +84,8 @@ def for_connection Connection.new( type: connection_type, host: host, - port: port + port: port, + converter: converter ).freeze end diff --git a/lib/temporal/connection.rb b/lib/temporal/connection.rb index b499ca73..74630be9 100644 --- a/lib/temporal/connection.rb +++ b/lib/temporal/connection.rb @@ -15,7 +15,7 @@ def self.generate(configuration) thread_id = Thread.current.object_id identity = "#{thread_id}@#{hostname}" - connection_class.new(host, port, identity) + connection_class.new(host, port, identity, configuration.converter) end end end diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index af5ef156..fd9a3cb2 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -8,13 +8,10 @@ require 'temporal/connection/errors' require 'temporal/connection/serializer' require 'temporal/connection/serializer/failure' -require 'temporal/concerns/payloads' module Temporal module Connection class GRPC - include Concerns::Payloads - WORKFLOW_ID_REUSE_POLICY = { allow_failed: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY, allow: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, @@ -30,9 +27,10 @@ class GRPC max_page_size: 100 }.freeze - def initialize(host, port, identity, options = {}) + def initialize(host, port, identity, converter, options = {}) @url = "#{host}:#{port}" @identity = identity + @converter = converter @poll = true @poll_mutex = Mutex.new @poll_request = nil @@ -102,17 +100,17 @@ def start_workflow_execution( task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), - input: to_payloads(input), + input: converter.to_payloads(input), workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, request_id: SecureRandom.uuid, header: Temporal::Api::Common::V1::Header.new( - fields: to_payload_map(headers || {}) + fields: converter.to_payload_map(headers || {}) ), cron_schedule: cron_schedule, memo: Temporal::Api::Common::V1::Memo.new( - fields: to_payload_map(memo || {}) + fields: converter.to_payload_map(memo || {}) ) ) @@ -141,7 +139,7 @@ def get_workflow_execution_history( event_type: :all, timeout: nil ) - if wait_for_new_event + if wait_for_new_event if timeout.nil? # This is an internal error. Wrappers should enforce this. raise "You must specify a timeout when wait_for_new_event = true." @@ -224,7 +222,7 @@ def record_activity_task_heartbeat(namespace:, task_token:, details: nil) request = Temporal::Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new( namespace: namespace, task_token: task_token, - details: to_details_payloads(details), + details: converter.to_details_payloads(details), identity: identity ) client.record_activity_task_heartbeat(request) @@ -239,7 +237,7 @@ def respond_activity_task_completed(namespace:, task_token:, result:) namespace: namespace, identity: identity, task_token: task_token, - result: to_result_payloads(result), + result: converter.to_result_payloads(result), ) client.respond_activity_task_completed(request) end @@ -251,7 +249,7 @@ def respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id: workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, - result: to_result_payloads(result) + result: converter.to_result_payloads(result) ) client.respond_activity_task_completed_by_id(request) end @@ -282,7 +280,7 @@ def respond_activity_task_canceled(namespace:, task_token:, details: nil) request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new( namespace: namespace, task_token: task_token, - details: to_details_payloads(details), + details: converter.to_details_payloads(details), identity: identity ) client.respond_activity_task_canceled(request) @@ -304,7 +302,7 @@ def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: run_id: run_id ), signal_name: signal, - input: to_signal_payloads(input), + input: converter.to_signal_payloads(input), identity: identity ) client.signal_workflow_execution(request) @@ -327,9 +325,9 @@ def signal_with_start_workflow_execution( memo: nil ) proto_header_fields = if headers.nil? - to_payload_map({}) + converter.to_payload_map({}) elsif headers.class == Hash - to_payload_map(headers) + converter.to_payload_map(headers) else # Preserve backward compatability for headers specified using proto objects warn '[DEPRECATION] Specify headers using a hash rather than protobuf objects' @@ -346,7 +344,7 @@ def signal_with_start_workflow_execution( task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue ), - input: to_payloads(input), + input: converter.to_payloads(input), workflow_execution_timeout: execution_timeout, workflow_run_timeout: run_timeout, workflow_task_timeout: task_timeout, @@ -356,9 +354,9 @@ def signal_with_start_workflow_execution( ), cron_schedule: cron_schedule, signal_name: signal_name, - signal_input: to_signal_payloads(signal_input), + signal_input: converter.to_signal_payloads(signal_input), memo: Temporal::Api::Common::V1::Memo.new( - fields: to_payload_map(memo || {}) + fields: converter.to_payload_map(memo || {}) ), ) @@ -400,7 +398,7 @@ def terminate_workflow_execution( run_id: run_id, ), reason: reason, - details: to_details_payloads(details) + details: converter.to_details_payloads(details) ) client.terminate_workflow_execution(request) @@ -495,7 +493,7 @@ def cancel_polling_request private - attr_reader :url, :identity, :options, :poll_mutex, :poll_request + attr_reader :url, :identity, :converter, :options, :poll_mutex, :poll_request def client @client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new( diff --git a/spec/unit/lib/temporal/connection_spec.rb b/spec/unit/lib/temporal/connection_spec.rb new file mode 100644 index 00000000..296519f9 --- /dev/null +++ b/spec/unit/lib/temporal/connection_spec.rb @@ -0,0 +1,18 @@ +require 'temporal/connection' +require 'temporal/configuration' + +describe Temporal::Connection do + let(:config) { Temporal::Configuration.new } + + describe '.generate' do + before { allow(Temporal::Connection::GRPC).to receive(:new) } + + it 'generates a new GRPC conection' do + described_class.generate(config.for_connection) + + expect(Temporal::Connection::GRPC) + .to have_received(:new) + .with(config.host, config.port, an_instance_of(String), config.converter) + end + end +end diff --git a/spec/unit/lib/temporal/grpc_client_spec.rb b/spec/unit/lib/temporal/grpc_client_spec.rb index 70a7885d..e2788516 100644 --- a/spec/unit/lib/temporal/grpc_client_spec.rb +++ b/spec/unit/lib/temporal/grpc_client_spec.rb @@ -1,5 +1,6 @@ describe Temporal::Connection::GRPC do - subject { Temporal::Connection::GRPC.new(nil, nil, nil) } + subject { Temporal::Connection::GRPC.new(nil, nil, nil, config.converter) } + let(:config) { Temporal::Configuration.new } let(:grpc_stub) { double('grpc stub') } let(:namespace) { 'test-namespace' } let(:workflow_id) { SecureRandom.uuid } From ed0d9cf10452f1dc9d088022a9a0cdbd818c7bef Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 3 Mar 2022 19:45:58 +0000 Subject: [PATCH 04/16] Use ConverterWrapper for Serializers --- lib/temporal/connection/grpc.rb | 6 +++--- lib/temporal/connection/serializer.rb | 4 ++-- lib/temporal/connection/serializer/base.rb | 5 +++-- .../connection/serializer/complete_workflow.rb | 5 +---- lib/temporal/connection/serializer/continue_as_new.rb | 11 ++++------- lib/temporal/connection/serializer/failure.rb | 5 +---- lib/temporal/connection/serializer/record_marker.rb | 5 +---- .../connection/serializer/schedule_activity.rb | 7 ++----- .../connection/serializer/signal_external_workflow.rb | 5 +---- .../connection/serializer/start_child_workflow.rb | 11 ++++------- .../connection/serializer/upsert_search_attributes.rb | 5 +---- .../connection/serializer/continue_as_new_spec.rb | 6 ++++-- .../temporal/connection/serializer/failure_spec.rb | 4 +++- .../connection/serializer/retry_policy_spec.rb | 4 +++- .../serializer/upsert_search_attributes_spec.rb | 10 ++++------ 15 files changed, 37 insertions(+), 56 deletions(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index fd9a3cb2..2c0c7d8b 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -196,7 +196,7 @@ def respond_workflow_task_failed(namespace:, task_token:, cause:, exception: nil identity: identity, task_token: task_token, cause: cause, - failure: Serializer::Failure.new(exception).to_proto + failure: Serializer::Failure.new(exception, converter).to_proto ) client.respond_workflow_task_failed(request) end @@ -259,7 +259,7 @@ def respond_activity_task_failed(namespace:, task_token:, exception:) namespace: namespace, identity: identity, task_token: task_token, - failure: Serializer::Failure.new(exception).to_proto + failure: Serializer::Failure.new(exception, converter).to_proto ) client.respond_activity_task_failed(request) end @@ -271,7 +271,7 @@ def respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, r workflow_id: workflow_id, run_id: run_id, activity_id: activity_id, - failure: Serializer::Failure.new(exception).to_proto + failure: Serializer::Failure.new(exception, converter).to_proto ) client.respond_activity_task_failed_by_id(request) end diff --git a/lib/temporal/connection/serializer.rb b/lib/temporal/connection/serializer.rb index 6343cb01..7ff0b73b 100644 --- a/lib/temporal/connection/serializer.rb +++ b/lib/temporal/connection/serializer.rb @@ -28,9 +28,9 @@ module Serializer Workflow::Command::UpsertSearchAttributes => Serializer::UpsertSearchAttributes, }.freeze - def self.serialize(object) + def self.serialize(object, converter) serializer = SERIALIZERS_MAP[object.class] - serializer.new(object).to_proto + serializer.new(object, converter).to_proto end end end diff --git a/lib/temporal/connection/serializer/base.rb b/lib/temporal/connection/serializer/base.rb index 9fcd49c5..79e8767a 100644 --- a/lib/temporal/connection/serializer/base.rb +++ b/lib/temporal/connection/serializer/base.rb @@ -6,8 +6,9 @@ module Temporal module Connection module Serializer class Base - def initialize(object) + def initialize(object, converter) @object = object + @converter = converter end def to_proto @@ -16,7 +17,7 @@ def to_proto private - attr_reader :object + attr_reader :object, :converter end end end diff --git a/lib/temporal/connection/serializer/complete_workflow.rb b/lib/temporal/connection/serializer/complete_workflow.rb index f228dbee..3dfa103a 100644 --- a/lib/temporal/connection/serializer/complete_workflow.rb +++ b/lib/temporal/connection/serializer/complete_workflow.rb @@ -1,18 +1,15 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class CompleteWorkflow < Base - include Concerns::Payloads - def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, complete_workflow_execution_command_attributes: Temporal::Api::Command::V1::CompleteWorkflowExecutionCommandAttributes.new( - result: to_result_payloads(object.result) + result: converter.to_result_payloads(object.result) ) ) end diff --git a/lib/temporal/connection/serializer/continue_as_new.rb b/lib/temporal/connection/serializer/continue_as_new.rb index 357f0008..cd113a9f 100644 --- a/lib/temporal/connection/serializer/continue_as_new.rb +++ b/lib/temporal/connection/serializer/continue_as_new.rb @@ -1,13 +1,10 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class ContinueAsNew < Base - include Concerns::Payloads - def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, @@ -15,10 +12,10 @@ def to_proto Temporal::Api::Command::V1::ContinueAsNewWorkflowExecutionCommandAttributes.new( workflow_type: Temporal::Api::Common::V1::WorkflowType.new(name: object.workflow_type), task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), workflow_run_timeout: object.timeouts[:execution], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, header: serialize_headers(object.headers), memo: serialize_memo(object.memo) ) @@ -30,13 +27,13 @@ def to_proto def serialize_headers(headers) return unless headers - Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + Temporal::Api::Common::V1::Header.new(fields: converter.to_payload_map(headers)) end def serialize_memo(memo) return unless memo - Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo)) + Temporal::Api::Common::V1::Memo.new(fields: converter.to_payload_map(memo)) end end end diff --git a/lib/temporal/connection/serializer/failure.rb b/lib/temporal/connection/serializer/failure.rb index 15dfc555..66d76424 100644 --- a/lib/temporal/connection/serializer/failure.rb +++ b/lib/temporal/connection/serializer/failure.rb @@ -1,19 +1,16 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class Failure < Base - include Concerns::Payloads - def to_proto Temporal::Api::Failure::V1::Failure.new( message: object.message, stack_trace: stack_trace_from(object.backtrace), application_failure_info: Temporal::Api::Failure::V1::ApplicationFailureInfo.new( type: object.class.name, - details: to_details_payloads(object.message) + details: converter.to_details_payloads(object.message) ) ) end diff --git a/lib/temporal/connection/serializer/record_marker.rb b/lib/temporal/connection/serializer/record_marker.rb index 133d79dc..87fca6a2 100644 --- a/lib/temporal/connection/serializer/record_marker.rb +++ b/lib/temporal/connection/serializer/record_marker.rb @@ -1,12 +1,9 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class RecordMarker < Base - include Concerns::Payloads - def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_RECORD_MARKER, @@ -14,7 +11,7 @@ def to_proto Temporal::Api::Command::V1::RecordMarkerCommandAttributes.new( marker_name: object.name, details: { - 'data' => to_details_payloads(object.details) + 'data' => converter.to_details_payloads(object.details) } ) ) diff --git a/lib/temporal/connection/serializer/schedule_activity.rb b/lib/temporal/connection/serializer/schedule_activity.rb index 93d3a207..dc027972 100644 --- a/lib/temporal/connection/serializer/schedule_activity.rb +++ b/lib/temporal/connection/serializer/schedule_activity.rb @@ -1,13 +1,10 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class ScheduleActivity < Base - include Concerns::Payloads - def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, @@ -15,14 +12,14 @@ def to_proto Temporal::Api::Command::V1::ScheduleActivityTaskCommandAttributes.new( activity_id: object.activity_id.to_s, activity_type: Temporal::Api::Common::V1::ActivityType.new(name: object.activity_type), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), namespace: object.namespace, task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), schedule_to_close_timeout: object.timeouts[:schedule_to_close], schedule_to_start_timeout: object.timeouts[:schedule_to_start], start_to_close_timeout: object.timeouts[:start_to_close], heartbeat_timeout: object.timeouts[:heartbeat], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, header: serialize_headers(object.headers) ) ) diff --git a/lib/temporal/connection/serializer/signal_external_workflow.rb b/lib/temporal/connection/serializer/signal_external_workflow.rb index 91907edd..cf835a28 100644 --- a/lib/temporal/connection/serializer/signal_external_workflow.rb +++ b/lib/temporal/connection/serializer/signal_external_workflow.rb @@ -1,12 +1,9 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class SignalExternalWorkflow < Base - include Concerns::Payloads - def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION, @@ -15,7 +12,7 @@ def to_proto namespace: object.namespace, execution: serialize_execution(object.execution), signal_name: object.signal_name, - input: to_signal_payloads(object.input), + input: converter.to_signal_payloads(object.input), control: "", # deprecated child_workflow_only: object.child_workflow_only ) diff --git a/lib/temporal/connection/serializer/start_child_workflow.rb b/lib/temporal/connection/serializer/start_child_workflow.rb index 5f6b350d..0b2b4009 100644 --- a/lib/temporal/connection/serializer/start_child_workflow.rb +++ b/lib/temporal/connection/serializer/start_child_workflow.rb @@ -1,13 +1,10 @@ require 'temporal/connection/serializer/base' require 'temporal/connection/serializer/retry_policy' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class StartChildWorkflow < Base - include Concerns::Payloads - def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, @@ -17,11 +14,11 @@ def to_proto workflow_id: object.workflow_id.to_s, workflow_type: Temporal::Api::Common::V1::WorkflowType.new(name: object.workflow_type), task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(name: object.task_queue), - input: to_payloads(object.input), + input: converter.to_payloads(object.input), workflow_execution_timeout: object.timeouts[:execution], workflow_run_timeout: object.timeouts[:run], workflow_task_timeout: object.timeouts[:task], - retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy).to_proto, + retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto, header: serialize_headers(object.headers), memo: serialize_memo(object.memo), ) @@ -33,13 +30,13 @@ def to_proto def serialize_headers(headers) return unless headers - Temporal::Api::Common::V1::Header.new(fields: to_payload_map(headers)) + Temporal::Api::Common::V1::Header.new(fields: converter.to_payload_map(headers)) end def serialize_memo(memo) return unless memo - Temporal::Api::Common::V1::Memo.new(fields: to_payload_map(memo)) + Temporal::Api::Common::V1::Memo.new(fields: converter.to_payload_map(memo)) end end end diff --git a/lib/temporal/connection/serializer/upsert_search_attributes.rb b/lib/temporal/connection/serializer/upsert_search_attributes.rb index c11a8a0a..bc61bafb 100644 --- a/lib/temporal/connection/serializer/upsert_search_attributes.rb +++ b/lib/temporal/connection/serializer/upsert_search_attributes.rb @@ -1,19 +1,16 @@ require 'temporal/connection/serializer/base' -require 'temporal/concerns/payloads' module Temporal module Connection module Serializer class UpsertSearchAttributes < Base - include Concerns::Payloads - def to_proto Temporal::Api::Command::V1::Command.new( command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES, upsert_workflow_search_attributes_command_attributes: Temporal::Api::Command::V1::UpsertWorkflowSearchAttributesCommandAttributes.new( search_attributes: Temporal::Api::Common::V1::SearchAttributes.new( - indexed_fields: to_payload_map(object.search_attributes || {}) + indexed_fields: converter.to_payload_map(object.search_attributes || {}) ), ) ) diff --git a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb index 18de4355..a4868b33 100644 --- a/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/continue_as_new_spec.rb @@ -2,18 +2,20 @@ require 'temporal/workflow/command' describe Temporal::Connection::Serializer::ContinueAsNew do + let(:config) { Temporal::Configuration.new } + describe 'to_proto' do it 'produces a protobuf' do command = Temporal::Workflow::Command::ContinueAsNew.new( workflow_type: 'my-workflow-type', task_queue: 'my-task-queue', input: ['one', 'two'], - timeouts: Temporal.configuration.timeouts, + timeouts: config.timeouts, headers: {'foo-header': 'bar'}, memo: {'foo-memo': 'baz'}, ) - result = described_class.new(command).to_proto + result = described_class.new(command, config.converter).to_proto expect(result).to be_an_instance_of(Temporal::Api::Command::V1::Command) expect(result.command_type).to eql( diff --git a/spec/unit/lib/temporal/connection/serializer/failure_spec.rb b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb index cff68c52..e3084c37 100644 --- a/spec/unit/lib/temporal/connection/serializer/failure_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/failure_spec.rb @@ -2,9 +2,11 @@ require 'temporal/workflow/command' describe Temporal::Connection::Serializer::Failure do + let(:config) { Temporal::Configuration.new } + describe 'to_proto' do it 'produces a protobuf' do - result = described_class.new(StandardError.new('test')).to_proto + result = described_class.new(StandardError.new('test'), config.converter).to_proto expect(result).to be_an_instance_of(Temporal::Api::Failure::V1::Failure) end diff --git a/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb index 211f807f..a4ee8ff0 100644 --- a/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/retry_policy_spec.rb @@ -2,6 +2,8 @@ require 'temporal/connection/serializer/retry_policy' describe Temporal::Connection::Serializer::RetryPolicy do + let(:config) { Temporal::Configuration.new } + describe 'to_proto' do let(:example_policy) do Temporal::RetryPolicy.new( @@ -14,7 +16,7 @@ end it 'converts to proto' do - proto = described_class.new(example_policy).to_proto + proto = described_class.new(example_policy, config.converter).to_proto expect(proto.initial_interval.seconds).to eq(1) expect(proto.backoff_coefficient).to eq(1.5) expect(proto.maximum_interval.seconds).to eq(5) diff --git a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb index 21816b6c..1e1f596f 100644 --- a/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/upsert_search_attributes_spec.rb @@ -3,11 +3,9 @@ require 'temporal/connection/serializer/upsert_search_attributes' require 'temporal/workflow/command' -class TestDeserializer - extend Temporal::Concerns::Payloads -end - describe Temporal::Connection::Serializer::UpsertSearchAttributes do + let(:config) { Temporal::Configuration.new } + it 'produces a protobuf that round-trips' do expected_attributes = { 'CustomStringField' => 'moo', @@ -22,14 +20,14 @@ class TestDeserializer search_attributes: expected_attributes ) - result = described_class.new(command).to_proto + result = described_class.new(command, config.converter).to_proto expect(result).to be_an_instance_of(Temporal::Api::Command::V1::Command) expect(result.command_type).to eql( :COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES ) command_attributes = result.upsert_workflow_search_attributes_command_attributes expect(command_attributes).not_to be_nil - actual_attributes = TestDeserializer.from_payload_map(command_attributes&.search_attributes&.indexed_fields) + actual_attributes = config.converter.from_payload_map(command_attributes&.search_attributes&.indexed_fields) expect(actual_attributes).to eql(expected_attributes) end From 031bae751dc5b79ff140e613f3b9da807ad08fe8 Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 3 Mar 2022 20:06:50 +0000 Subject: [PATCH 05/16] Use ConverterWrapper in Metadata, ExecutionInfo, StateManager, Client, TaskProcessor --- .../call_failing_activity_workflow_spec.rb | 5 ----- lib/temporal/activity/task_processor.rb | 7 ++---- lib/temporal/client.rb | 9 ++------ lib/temporal/metadata.rb | 17 ++++++-------- lib/temporal/workflow/errors.rb | 1 + lib/temporal/workflow/execution_info.rb | 7 +++--- lib/temporal/workflow/executor.rb | 4 ++-- lib/temporal/workflow/state_manager.rb | 22 +++++++++---------- .../temporal/activity/task_processor_spec.rb | 4 ++-- spec/unit/lib/temporal/metadata_spec.rb | 6 +++-- .../temporal/workflow/execution_info_spec.rb | 3 ++- .../temporal/workflow/state_manager_spec.rb | 5 +++-- 12 files changed, 38 insertions(+), 52 deletions(-) diff --git a/examples/spec/integration/call_failing_activity_workflow_spec.rb b/examples/spec/integration/call_failing_activity_workflow_spec.rb index c39853a6..090dd312 100644 --- a/examples/spec/integration/call_failing_activity_workflow_spec.rb +++ b/examples/spec/integration/call_failing_activity_workflow_spec.rb @@ -1,11 +1,6 @@ require 'workflows/call_failing_activity_workflow' describe CallFailingActivityWorkflow, :integration do - - class TestDeserializer - include Temporal::Concerns::Payloads - end - it 'correctly re-raises an activity-thrown exception in the workflow' do workflow_id = SecureRandom.uuid expected_message = "a failure message" diff --git a/lib/temporal/activity/task_processor.rb b/lib/temporal/activity/task_processor.rb index 27ee7ba4..7bf47f24 100644 --- a/lib/temporal/activity/task_processor.rb +++ b/lib/temporal/activity/task_processor.rb @@ -2,19 +2,16 @@ require 'temporal/error_handler' require 'temporal/errors' require 'temporal/activity/context' -require 'temporal/concerns/payloads' require 'temporal/connection/retryer' require 'temporal/connection' module Temporal class Activity class TaskProcessor - include Concerns::Payloads - def initialize(task, namespace, activity_lookup, middleware_chain, config) @task = task @namespace = namespace - @metadata = Metadata.generate_activity_metadata(task, namespace) + @metadata = Metadata.generate_activity_metadata(task, namespace, config.converter) @task_token = task.task_token @activity_name = task.activity_type.name @activity_class = activity_lookup.find(activity_name) @@ -35,7 +32,7 @@ def process end result = middleware_chain.invoke(metadata) do - activity_class.execute_in_context(context, from_payloads(task.input)) + activity_class.execute_in_context(context, config.converter.from_payloads(task.input)) end # Do not complete asynchronous activities, these should be completed manually diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index ac0b3d51..bf100905 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -216,7 +216,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam case closed_event.type when 'WORKFLOW_EXECUTION_COMPLETED' payloads = closed_event.attributes.result - return ResultConverter.from_result_payloads(payloads) + return config.converter.from_result_payloads(payloads) when 'WORKFLOW_EXECUTION_TIMED_OUT' raise Temporal::WorkflowTimedOut when 'WORKFLOW_EXECUTION_TERMINATED' @@ -306,7 +306,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id) run_id: run_id ) - Workflow::ExecutionInfo.generate_from(response.workflow_execution_info) + Workflow::ExecutionInfo.generate_from(response.workflow_execution_info, config.converter) end # Manually complete an activity @@ -372,11 +372,6 @@ def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {}) fetch_executions(:closed, { namespace: namespace, from: from, to: to }.merge(filter)) end - class ResultConverter - extend Concerns::Payloads - end - private_constant :ResultConverter - private attr_reader :config diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index df7165ea..933050a3 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -2,15 +2,12 @@ require 'temporal/metadata/activity' require 'temporal/metadata/workflow' require 'temporal/metadata/workflow_task' -require 'temporal/concerns/payloads' module Temporal module Metadata class << self - include Concerns::Payloads - - def generate_activity_metadata(task, namespace) + def generate_activity_metadata(task, namespace, converter) Metadata::Activity.new( namespace: namespace, id: task.activity_id, @@ -20,8 +17,8 @@ def generate_activity_metadata(task, namespace) workflow_run_id: task.workflow_execution.run_id, workflow_id: task.workflow_execution.workflow_id, workflow_name: task.workflow_type.name, - headers: from_payload_map(task.header&.fields || {}), - heartbeat_details: from_details_payloads(task.heartbeat_details) + headers: converter.from_payload_map(task.header&.fields || {}), + heartbeat_details: converter.from_details_payloads(task.heartbeat_details) ) end @@ -38,9 +35,9 @@ def generate_workflow_task_metadata(task, namespace) end # @param event [Temporal::Workflow::History::Event] Workflow started history event - # @param event [WorkflowExecutionStartedEventAttributes] :attributes # @param task_metadata [Temporal::Metadata::WorkflowTask] workflow task metadata - def generate_workflow_metadata(event, task_metadata) + # @param converter [Temporal::ConverterWrapper] + def generate_workflow_metadata(event, task_metadata, converter) Metadata::Workflow.new( name: event.attributes.workflow_type.name, id: task_metadata.workflow_id, @@ -48,9 +45,9 @@ def generate_workflow_metadata(event, task_metadata) attempt: event.attributes.attempt, namespace: task_metadata.namespace, task_queue: event.attributes.task_queue.name, - headers: from_payload_map(event.attributes.header&.fields || {}), + headers: converter.from_payload_map(event.attributes.header&.fields || {}), run_started_at: event.timestamp, - memo: from_payload_map(event.attributes.memo&.fields || {}), + memo: converter.from_payload_map(event.attributes.memo&.fields || {}), ) end end diff --git a/lib/temporal/workflow/errors.rb b/lib/temporal/workflow/errors.rb index cea7f471..fedc7f78 100644 --- a/lib/temporal/workflow/errors.rb +++ b/lib/temporal/workflow/errors.rb @@ -1,4 +1,5 @@ require 'temporal/errors' +require 'temporal/concerns/payloads' module Temporal class Workflow diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb index 76f46ff6..9ddf6f8a 100644 --- a/lib/temporal/workflow/execution_info.rb +++ b/lib/temporal/workflow/execution_info.rb @@ -1,4 +1,3 @@ -require 'temporal/concerns/payloads' require 'temporal/workflow/status' module Temporal @@ -16,7 +15,7 @@ class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, Temporal::Workflow::Status::TIMED_OUT, ] - def self.generate_from(response) + def self.generate_from(response, converter) new( workflow: response.type.name, workflow_id: response.execution.workflow_id, @@ -25,8 +24,8 @@ def self.generate_from(response) close_time: response.close_time&.to_time, status: Temporal::Workflow::Status::API_STATUS_MAP.fetch(response.status), history_length: response.history_length, - memo: self.from_payload_map(response.memo.fields), - search_attributes: self.from_payload_map(response.search_attributes.indexed_fields), + memo: converter.from_payload_map(response.memo.fields), + search_attributes: converter.from_payload_map(response.search_attributes.indexed_fields), ).freeze end diff --git a/lib/temporal/workflow/executor.rb b/lib/temporal/workflow/executor.rb index 78feb61b..7a3cfb10 100644 --- a/lib/temporal/workflow/executor.rb +++ b/lib/temporal/workflow/executor.rb @@ -16,7 +16,7 @@ class Executor def initialize(workflow_class, history, task_metadata, config) @workflow_class = workflow_class @dispatcher = Dispatcher.new - @state_manager = StateManager.new(dispatcher) + @state_manager = StateManager.new(dispatcher, config.converter) @history = history @task_metadata = task_metadata @config = config @@ -41,7 +41,7 @@ def run attr_reader :workflow_class, :dispatcher, :state_manager, :task_metadata, :history, :config def execute_workflow(input, workflow_started_event) - metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata) + metadata = Metadata.generate_workflow_metadata(workflow_started_event, task_metadata, config.converter) context = Workflow::Context.new(state_manager, dispatcher, workflow_class, metadata, config) Fiber.new do diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 8c991b08..2aa5be54 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -3,14 +3,11 @@ require 'temporal/workflow/command' require 'temporal/workflow/command_state_machine' require 'temporal/workflow/history/event_target' -require 'temporal/concerns/payloads' require 'temporal/workflow/errors' module Temporal class Workflow class StateManager - include Concerns::Payloads - SIDE_EFFECT_MARKER = 'SIDE_EFFECT'.freeze RELEASE_MARKER = 'RELEASE'.freeze @@ -19,8 +16,9 @@ class UnsupportedMarkerType < Temporal::InternalError; end attr_reader :commands, :local_time - def initialize(dispatcher) + def initialize(dispatcher, converter) @dispatcher = dispatcher + @converter = converter @commands = [] @marker_ids = Set.new @releases = {} @@ -89,7 +87,7 @@ def apply(history_window) private - attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases + attr_reader :dispatcher, :converter, :command_tracker, :marker_ids, :side_effects, :releases def next_event_id @last_event_id += 1 @@ -126,7 +124,7 @@ def apply_event(event) dispatch( History::EventTarget.workflow, 'started', - from_payloads(event.attributes.input), + converter.from_payloads(event.attributes.input), event, ) @@ -163,7 +161,7 @@ def apply_event(event) when 'ACTIVITY_TASK_COMPLETED' state_machine.complete - dispatch(target, 'completed', from_result_payloads(event.attributes.result)) + dispatch(target, 'completed', converter.from_result_payloads(event.attributes.result)) when 'ACTIVITY_TASK_FAILED' state_machine.fail @@ -184,7 +182,7 @@ def apply_event(event) when 'ACTIVITY_TASK_CANCELED' state_machine.cancel - dispatch(target, 'failed', Temporal::ActivityCanceled.new(from_details_payloads(event.attributes.details))) + dispatch(target, 'failed', Temporal::ActivityCanceled.new(converter.from_details_payloads(event.attributes.details))) when 'TIMER_STARTED' state_machine.start @@ -221,10 +219,10 @@ def apply_event(event) when 'MARKER_RECORDED' state_machine.complete - handle_marker(event.id, event.attributes.marker_name, from_details_payloads(event.attributes.details['data'])) + handle_marker(event.id, event.attributes.marker_name, converter.from_details_payloads(event.attributes.details['data'])) when 'WORKFLOW_EXECUTION_SIGNALED' - dispatch(target, 'signaled', event.attributes.signal_name, from_signal_payloads(event.attributes.input)) + dispatch(target, 'signaled', event.attributes.signal_name, converter.from_signal_payloads(event.attributes.input)) when 'WORKFLOW_EXECUTION_TERMINATED' # todo @@ -238,14 +236,14 @@ def apply_event(event) when 'START_CHILD_WORKFLOW_EXECUTION_FAILED' state_machine.fail - dispatch(target, 'failed', 'StandardError', from_payloads(event.attributes.cause)) + dispatch(target, 'failed', 'StandardError', converter.from_payloads(event.attributes.cause)) when 'CHILD_WORKFLOW_EXECUTION_STARTED' state_machine.start when 'CHILD_WORKFLOW_EXECUTION_COMPLETED' state_machine.complete - dispatch(target, 'completed', from_result_payloads(event.attributes.result)) + dispatch(target, 'completed', converter.from_result_payloads(event.attributes.result)) when 'CHILD_WORKFLOW_EXECUTION_FAILED' state_machine.fail diff --git a/spec/unit/lib/temporal/activity/task_processor_spec.rb b/spec/unit/lib/temporal/activity/task_processor_spec.rb index 87688a20..0c70d3ad 100644 --- a/spec/unit/lib/temporal/activity/task_processor_spec.rb +++ b/spec/unit/lib/temporal/activity/task_processor_spec.rb @@ -14,7 +14,7 @@ input: config.converter.to_payloads(input) ) end - let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace) } + let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace, config.converter) } let(:workflow_name) { task.workflow_type.name } let(:activity_name) { 'TestActivity' } let(:connection) { instance_double('Temporal::Connection::GRPC') } @@ -32,7 +32,7 @@ .and_return(connection) allow(Temporal::Metadata) .to receive(:generate_activity_metadata) - .with(task, namespace) + .with(task, namespace, config.converter) .and_return(metadata) allow(Temporal::Activity::Context).to receive(:new).with(connection, metadata).and_return(context) diff --git a/spec/unit/lib/temporal/metadata_spec.rb b/spec/unit/lib/temporal/metadata_spec.rb index cd21fb76..a77ffca6 100644 --- a/spec/unit/lib/temporal/metadata_spec.rb +++ b/spec/unit/lib/temporal/metadata_spec.rb @@ -1,8 +1,10 @@ require 'temporal/metadata' describe Temporal::Metadata do + let(:config) { Temporal::Configuration.new } + describe '.generate_activity_metadata' do - subject { described_class.generate_activity_metadata(data, namespace) } + subject { described_class.generate_activity_metadata(data, namespace, config.converter) } let(:data) { Fabricate(:api_activity_task) } let(:namespace) { 'test-namespace' } @@ -46,7 +48,7 @@ end context '.generate_workflow_metadata' do - subject { described_class.generate_workflow_metadata(event, task_metadata) } + subject { described_class.generate_workflow_metadata(event, task_metadata, config.converter) } let(:event) { Temporal::Workflow::History::Event.new(Fabricate(:api_workflow_execution_started_event)) } let(:task_metadata) { Fabricate(:workflow_task_metadata) } let(:namespace) { nil } diff --git a/spec/unit/lib/temporal/workflow/execution_info_spec.rb b/spec/unit/lib/temporal/workflow/execution_info_spec.rb index 4074a3d6..900c54b8 100644 --- a/spec/unit/lib/temporal/workflow/execution_info_spec.rb +++ b/spec/unit/lib/temporal/workflow/execution_info_spec.rb @@ -1,7 +1,8 @@ require 'temporal/workflow/execution_info' describe Temporal::Workflow::ExecutionInfo do - subject { described_class.generate_from(api_info) } + subject { described_class.generate_from(api_info, config.converter) } + let(:config) { Temporal::Configuration.new } let(:api_info) { Fabricate(:api_workflow_execution_info, workflow: 'TestWorkflow', workflow_id: '') } describe '.generate_for' do diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index 57745b71..087fa3ee 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -4,6 +4,7 @@ require 'temporal/errors' describe Temporal::Workflow::StateManager do + let(:config) { Temporal::Configuration.new } describe '#schedule' do class MyWorkflow < Temporal::Workflow; end @@ -22,7 +23,7 @@ class MyWorkflow < Temporal::Workflow; end ), ].each do |terminal_command| it "fails to validate if #{terminal_command.class} is not the last command scheduled" do - state_manager = described_class.new(Temporal::Workflow::Dispatcher.new) + state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, config.converter) next_command = Temporal::Workflow::Command::RecordMarker.new( name: Temporal::Workflow::StateManager::RELEASE_MARKER, @@ -36,4 +37,4 @@ class MyWorkflow < Temporal::Workflow; end end end end -end \ No newline at end of file +end From 6d98e1bb0ca1e2671e758a0c1be212292cd73330 Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 3 Mar 2022 20:25:36 +0000 Subject: [PATCH 06/16] Use ConverterWrapper in Workflow::Errors and fabricators --- lib/temporal/client.rb | 2 +- lib/temporal/workflow/errors.rb | 9 +++------ lib/temporal/workflow/state_manager.rb | 13 ++++++++----- .../grpc/application_failure_fabricator.rb | 8 +++----- spec/fabricators/grpc/history_event_fabricator.rb | 7 ++----- spec/unit/lib/temporal/workflow/errors_spec.rb | 10 ++++++---- 6 files changed, 23 insertions(+), 26 deletions(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index bf100905..1ffad9b9 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -224,7 +224,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam when 'WORKFLOW_EXECUTION_CANCELED' raise Temporal::WorkflowCanceled when 'WORKFLOW_EXECUTION_FAILED' - raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure) + raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure, config.converter) when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW' new_run_id = closed_event.attributes.new_execution_run_id # Throw to let the caller know they're not getting the result diff --git a/lib/temporal/workflow/errors.rb b/lib/temporal/workflow/errors.rb index fedc7f78..8b3f1a2d 100644 --- a/lib/temporal/workflow/errors.rb +++ b/lib/temporal/workflow/errors.rb @@ -1,17 +1,14 @@ require 'temporal/errors' -require 'temporal/concerns/payloads' module Temporal class Workflow class Errors - extend Concerns::Payloads - # Convert a failure returned from the server to an Error to raise to the client # failure: Temporal::Api::Failure::V1::Failure - def self.generate_error(failure, default_exception_class = StandardError) + def self.generate_error(failure, converter, default_exception_class = StandardError) case failure.failure_info when :application_failure_info - message = from_details_payloads(failure.application_failure_info.details) + message = converter.from_details_payloads(failure.application_failure_info.details) exception_class = safe_constantize(failure.application_failure_info.type) if exception_class.nil? @@ -47,7 +44,7 @@ def self.generate_error(failure, default_exception_class = StandardError) TimeoutError.new("Timeout type: #{failure.timeout_failure_info.timeout_type.to_s}") when :canceled_failure_info # TODO: Distinguish between different entity cancellations - StandardError.new(from_payloads(failure.canceled_failure_info.details)) + StandardError.new(converter.from_payloads(failure.canceled_failure_info.details)) else StandardError.new(failure.message) end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 2aa5be54..ecd76ae6 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -165,11 +165,11 @@ def apply_event(event) when 'ACTIVITY_TASK_FAILED' state_machine.fail - dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure, ActivityException)) + dispatch(target, 'failed', generate_error(event.attributes.failure, ActivityException)) when 'ACTIVITY_TASK_TIMED_OUT' state_machine.time_out - dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(target, 'failed', generate_error(event.attributes.failure)) when 'ACTIVITY_TASK_CANCEL_REQUESTED' state_machine.requested @@ -247,15 +247,15 @@ def apply_event(event) when 'CHILD_WORKFLOW_EXECUTION_FAILED' state_machine.fail - dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(target, 'failed', generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_CANCELED' state_machine.cancel - dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(target, 'failed', generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_TIMED_OUT' state_machine.time_out - dispatch(target, 'failed', Temporal::Workflow::Errors.generate_error(event.attributes.failure)) + dispatch(target, 'failed', generate_error(event.attributes.failure)) when 'CHILD_WORKFLOW_EXECUTION_TERMINATED' # todo @@ -356,6 +356,9 @@ def track_release(release_name) end end + def generate_error(failure, default_exception_class = StandardError) + Temporal::Workflow::Errors.generate_error(failure, converter, default_exception_class) + end end end end diff --git a/spec/fabricators/grpc/application_failure_fabricator.rb b/spec/fabricators/grpc/application_failure_fabricator.rb index edf90c82..fde7c73a 100644 --- a/spec/fabricators/grpc/application_failure_fabricator.rb +++ b/spec/fabricators/grpc/application_failure_fabricator.rb @@ -1,7 +1,5 @@ -require 'temporal/concerns/payloads' -class TestDeserializer - include Temporal::Concerns::Payloads -end +require 'temporal/configuration' + # Simulates Temporal::Connection::Serializer::Failure Fabricator(:api_application_failure, from: Temporal::Api::Failure::V1::Failure) do transient :error_class, :backtrace @@ -10,7 +8,7 @@ class TestDeserializer application_failure_info do |attrs| Temporal::Api::Failure::V1::ApplicationFailureInfo.new( type: attrs[:error_class], - details: TestDeserializer.new.to_details_payloads(attrs[:message]), + details: Temporal::Configuration.new.converter.to_details_payloads(attrs[:message]), ) end end diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index 9e8538eb..276809e7 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -1,8 +1,5 @@ require 'securerandom' - -class TestSerializer - extend Temporal::Concerns::Payloads -end +require 'temporal/configuration' Fabricator(:api_history_event, from: Temporal::Api::History::V1::HistoryEvent) do event_id { 1 } @@ -130,7 +127,7 @@ class TestSerializer event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCELED } activity_task_canceled_event_attributes do |attrs| Temporal::Api::History::V1::ActivityTaskCanceledEventAttributes.new( - details: TestSerializer.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), + details: Temporal::Configuration.new.converter.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), scheduled_event_id: attrs[:event_id] - 2, started_event_id: nil, identity: 'test-worker@test-host' diff --git a/spec/unit/lib/temporal/workflow/errors_spec.rb b/spec/unit/lib/temporal/workflow/errors_spec.rb index 05d5f82a..be61a1dc 100644 --- a/spec/unit/lib/temporal/workflow/errors_spec.rb +++ b/spec/unit/lib/temporal/workflow/errors_spec.rb @@ -16,6 +16,8 @@ def initialize(message) class SomeError < StandardError; end describe Temporal::Workflow::Errors do + let(:config) { Temporal::Configuration.new } + describe '.generate_error' do it "instantiates properly when the client has the error" do message = "An error message" @@ -27,7 +29,7 @@ class SomeError < StandardError; end error_class: SomeError.to_s ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(SomeError) expect(e.message).to eq(message) expect(e.backtrace).to eq(stack_trace) @@ -46,7 +48,7 @@ class SomeError < StandardError; end error_class: 'NonexistentError', ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(StandardError) expect(e.message).to eq("NonexistentError: An error message") expect(e.backtrace).to eq(stack_trace) @@ -72,7 +74,7 @@ class SomeError < StandardError; end error_class: ErrorWithTwoArgs.to_s, ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(StandardError) expect(e.message).to eq("ErrorWithTwoArgs: An error message") expect(e.backtrace).to eq(stack_trace) @@ -100,7 +102,7 @@ class SomeError < StandardError; end error_class: ErrorThatRaisesInInitialize.to_s, ) - e = Temporal::Workflow::Errors.generate_error(failure) + e = Temporal::Workflow::Errors.generate_error(failure, config.converter) expect(e).to be_a(StandardError) expect(e.message).to eq("ErrorThatRaisesInInitialize: An error message") expect(e.backtrace).to eq(stack_trace) From 9857a841d9106850fe9af8f7cf58a612c47e247d Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 3 Mar 2022 20:39:16 +0000 Subject: [PATCH 07/16] Remove Concerns::Payloads --- lib/temporal/concerns/payloads.rb | 59 ------------------------------- 1 file changed, 59 deletions(-) delete mode 100644 lib/temporal/concerns/payloads.rb diff --git a/lib/temporal/concerns/payloads.rb b/lib/temporal/concerns/payloads.rb deleted file mode 100644 index 3be276d2..00000000 --- a/lib/temporal/concerns/payloads.rb +++ /dev/null @@ -1,59 +0,0 @@ -module Temporal - module Concerns - module Payloads - def from_payloads(payloads) - payload_converter.from_payloads(payloads) - end - - def from_payload(payload) - payload_converter.from_payload(payload) - end - - def from_result_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_details_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_signal_payloads(payloads) - from_payloads(payloads)&.first - end - - def from_payload_map(payload_map) - payload_map.map { |key, value| [key, from_payload(value)] }.to_h - end - - def to_payloads(data) - payload_converter.to_payloads(data) - end - - def to_payload(data) - payload_converter.to_payload(data) - end - - def to_result_payloads(data) - to_payloads([data]) - end - - def to_details_payloads(data) - to_payloads([data]) - end - - def to_signal_payloads(data) - to_payloads([data]) - end - - def to_payload_map(data) - data.transform_values(&method(:to_payload)) - end - - private - - def payload_converter - Temporal.configuration.converter - end - end - end -end From da2679b1dd12a51ebf520ae3969eafee21afdfe8 Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 3 Mar 2022 21:00:33 +0000 Subject: [PATCH 08/16] Cleanup spec fabricators from explicit references to Temporal.configuration.converter --- examples/spec/integration/converter_spec.rb | 4 +--- spec/config/temporal.rb | 7 ++++++- spec/fabricators/grpc/activity_task_fabricator.rb | 7 ++----- spec/fabricators/grpc/application_failure_fabricator.rb | 4 +--- spec/fabricators/grpc/history_event_fabricator.rb | 8 +++----- spec/fabricators/grpc/memo_fabricator.rb | 2 +- spec/fabricators/grpc/search_attributes_fabricator.rb | 2 +- ...kflow_execution_started_event_attributes_fabricator.rb | 5 +---- spec/unit/lib/temporal/client_spec.rb | 6 +----- 9 files changed, 17 insertions(+), 28 deletions(-) diff --git a/examples/spec/integration/converter_spec.rb b/examples/spec/integration/converter_spec.rb index ce1ea66a..e4fccbdb 100644 --- a/examples/spec/integration/converter_spec.rb +++ b/examples/spec/integration/converter_spec.rb @@ -60,8 +60,6 @@ completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first result = completion_event.workflow_execution_completed_event_attributes.result - converter = Temporal.configuration.converter - - expect(converter.from_payloads(result)&.first).to eq('Hello World, Tom') + expect($converter.from_payloads(result)&.first).to eq('Hello World, Tom') end end diff --git a/spec/config/temporal.rb b/spec/config/temporal.rb index 0d868ffe..5bb173c0 100644 --- a/spec/config/temporal.rb +++ b/spec/config/temporal.rb @@ -1,5 +1,10 @@ +require 'temporal/converter_wrapper' +require 'temporal/configuration' + +$converter = Temporal::ConverterWrapper.new(Temporal::Configuration::DEFAULT_CONVERTER) + RSpec.configure do |config| config.before(:each) do Temporal.configuration.error_handlers.clear end -end \ No newline at end of file +end diff --git a/spec/fabricators/grpc/activity_task_fabricator.rb b/spec/fabricators/grpc/activity_task_fabricator.rb index b6fc43fc..0f962a6a 100644 --- a/spec/fabricators/grpc/activity_task_fabricator.rb +++ b/spec/fabricators/grpc/activity_task_fabricator.rb @@ -6,15 +6,12 @@ activity_id { SecureRandom.uuid } task_token { |attrs| attrs[:task_token] || SecureRandom.uuid } activity_type { Fabricate(:api_activity_type) } - input { Temporal.configuration.converter.to_payloads(nil) } + input { $converter.to_payloads(nil) } workflow_type { Fabricate(:api_workflow_type) } workflow_execution { Fabricate(:api_workflow_execution) } current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } started_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } } header do |attrs| - fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| - h[field] = Temporal.configuration.converter.to_payload(value) - end - Temporal::Api::Common::V1::Header.new(fields: fields) + Temporal::Api::Common::V1::Header.new(fields: $converter.to_payload_map(attrs[:headers] || {})) end end diff --git a/spec/fabricators/grpc/application_failure_fabricator.rb b/spec/fabricators/grpc/application_failure_fabricator.rb index fde7c73a..ec9f2462 100644 --- a/spec/fabricators/grpc/application_failure_fabricator.rb +++ b/spec/fabricators/grpc/application_failure_fabricator.rb @@ -1,5 +1,3 @@ -require 'temporal/configuration' - # Simulates Temporal::Connection::Serializer::Failure Fabricator(:api_application_failure, from: Temporal::Api::Failure::V1::Failure) do transient :error_class, :backtrace @@ -8,7 +6,7 @@ application_failure_info do |attrs| Temporal::Api::Failure::V1::ApplicationFailureInfo.new( type: attrs[:error_class], - details: Temporal::Configuration.new.converter.to_details_payloads(attrs[:message]), + details: $converter.to_details_payloads(attrs[:message]), ) end end diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index 276809e7..80ad087d 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -1,5 +1,4 @@ require 'securerandom' -require 'temporal/configuration' Fabricator(:api_history_event, from: Temporal::Api::History::V1::HistoryEvent) do event_id { 1 } @@ -11,10 +10,9 @@ event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED } event_time { Time.now } workflow_execution_started_event_attributes do |attrs| - header_fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| - h[field] = Temporal.configuration.converter.to_payload(value) - end - header = Temporal::Api::Common::V1::Header.new(fields: header_fields) + header = Temporal::Api::Common::V1::Header.new( + fields: $converter.to_payload_map(attrs[:headers] || {}) + ) Temporal::Api::History::V1::WorkflowExecutionStartedEventAttributes.new( workflow_type: Fabricate(:api_workflow_type), diff --git a/spec/fabricators/grpc/memo_fabricator.rb b/spec/fabricators/grpc/memo_fabricator.rb index 6c9fd726..2400b1b6 100644 --- a/spec/fabricators/grpc/memo_fabricator.rb +++ b/spec/fabricators/grpc/memo_fabricator.rb @@ -1,7 +1,7 @@ Fabricator(:memo, from: Temporal::Api::Common::V1::Memo) do fields do Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload).tap do |m| - m['foo'] = Temporal.configuration.converter.to_payload('bar') + m['foo'] = $converter.to_payload('bar') end end end diff --git a/spec/fabricators/grpc/search_attributes_fabricator.rb b/spec/fabricators/grpc/search_attributes_fabricator.rb index f201abd7..91a38d29 100644 --- a/spec/fabricators/grpc/search_attributes_fabricator.rb +++ b/spec/fabricators/grpc/search_attributes_fabricator.rb @@ -1,7 +1,7 @@ Fabricator(:search_attributes, from: Temporal::Api::Common::V1::SearchAttributes) do indexed_fields do Google::Protobuf::Map.new(:string, :message, Temporal::Api::Common::V1::Payload).tap do |m| - m['foo'] = Temporal.configuration.converter.to_payload('bar') + m['foo'] = $converter.to_payload('bar') end end end diff --git a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb index 0cc19e16..9f7999b2 100644 --- a/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb +++ b/spec/fabricators/grpc/workflow_execution_started_event_attributes_fabricator.rb @@ -11,9 +11,6 @@ attempt 1 task_queue { Fabricate(:api_task_queue) } header do |attrs| - fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h| - h[field] = Temporal.configuration.converter.to_payload(value) - end - Temporal::Api::Common::V1::Header.new(fields: fields) + Temporal::Api::Common::V1::Header.new(fields: $converter.to_payload_map(attrs[:headers] || {})) end end diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index 321207f7..af44951c 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -440,11 +440,7 @@ class NamespacedWorkflow < Temporal::Workflow ['string', 'a result'], ].each do |(type, expected_result)| it "completes and returns a #{type}" do - payload = Temporal::Api::Common::V1::Payloads.new( - payloads: [ - Temporal.configuration.converter.to_payload(expected_result) - ], - ) + payload = $converter.to_result_payloads(expected_result) completed_event = Fabricate(:workflow_completed_event, result: payload) response = Fabricate(:workflow_execution_history, events: [completed_event]) expect(connection) From b4517a9ead20fecced74bece2ebb3cbc9403b054 Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 4 Mar 2022 15:34:27 +0000 Subject: [PATCH 09/16] fixup! Use ConverterWrapper for Serializers --- lib/temporal/connection/grpc.rb | 2 +- lib/temporal/connection/serializer/fail_workflow.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/temporal/connection/grpc.rb b/lib/temporal/connection/grpc.rb index 2c0c7d8b..3c87ba32 100644 --- a/lib/temporal/connection/grpc.rb +++ b/lib/temporal/connection/grpc.rb @@ -185,7 +185,7 @@ def respond_workflow_task_completed(namespace:, task_token:, commands:) namespace: namespace, identity: identity, task_token: task_token, - commands: Array(commands).map { |(_, command)| Serializer.serialize(command) } + commands: Array(commands).map { |(_, command)| Serializer.serialize(command, converter) } ) client.respond_workflow_task_completed(request) end diff --git a/lib/temporal/connection/serializer/fail_workflow.rb b/lib/temporal/connection/serializer/fail_workflow.rb index 0cc79725..4eae4e13 100644 --- a/lib/temporal/connection/serializer/fail_workflow.rb +++ b/lib/temporal/connection/serializer/fail_workflow.rb @@ -10,7 +10,7 @@ def to_proto command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION, fail_workflow_execution_command_attributes: Temporal::Api::Command::V1::FailWorkflowExecutionCommandAttributes.new( - failure: Failure.new(object.exception).to_proto + failure: Failure.new(object.exception, converter).to_proto ) ) end From c0b631dcbc1b852c9944ca9733a9a56506599a17 Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 4 Mar 2022 20:49:58 +0000 Subject: [PATCH 10/16] fixup! Use ConverterWrapper in Metadata, ExecutionInfo, StateManager, Client, TaskProcessor --- lib/temporal/workflow/execution_info.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb index 9ddf6f8a..8e8c64d4 100644 --- a/lib/temporal/workflow/execution_info.rb +++ b/lib/temporal/workflow/execution_info.rb @@ -3,8 +3,6 @@ module Temporal class Workflow class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, :close_time, :status, :history_length, :memo, :search_attributes, keyword_init: true) - extend Concerns::Payloads - STATUSES = [ Temporal::Workflow::Status::RUNNING, Temporal::Workflow::Status::COMPLETED, From ae91e9721e2d1dab143e521c605aec476a0cb0b4 Mon Sep 17 00:00:00 2001 From: antstorm Date: Fri, 4 Mar 2022 20:51:12 +0000 Subject: [PATCH 11/16] fixup! Use ConverterWrapper in Metadata, ExecutionInfo, StateManager, Client, TaskProcessor --- lib/temporal/client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index 1ffad9b9..9de68b49 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -442,7 +442,7 @@ def fetch_executions(status, request_options) end executions.map do |raw_execution| - Temporal::Workflow::ExecutionInfo.generate_from(raw_execution) + Temporal::Workflow::ExecutionInfo.generate_from(raw_execution, config.converter) end end end From 8d58b57d2b4cd5c2b8d79f9ffb4ad63eacb2d7b4 Mon Sep 17 00:00:00 2001 From: antstorm Date: Sun, 6 Mar 2022 13:06:22 +0000 Subject: [PATCH 12/16] Fix converter spec --- examples/spec/integration/converter_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/spec/integration/converter_spec.rb b/examples/spec/integration/converter_spec.rb index e4fccbdb..a3fccdbd 100644 --- a/examples/spec/integration/converter_spec.rb +++ b/examples/spec/integration/converter_spec.rb @@ -60,6 +60,6 @@ completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first result = completion_event.workflow_execution_completed_event_attributes.result - expect($converter.from_payloads(result)&.first).to eq('Hello World, Tom') + expect(Temporal.configuration.converter.from_payloads(result)&.first).to eq('Hello World, Tom') end end From 05d01407e496e852f952b88dd1079a2ffac96eec Mon Sep 17 00:00:00 2001 From: antstorm Date: Mon, 21 Mar 2022 17:05:39 +0000 Subject: [PATCH 13/16] Fix comma --- lib/temporal/metadata.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/temporal/metadata.rb b/lib/temporal/metadata.rb index 6f71a4bb..09a81955 100644 --- a/lib/temporal/metadata.rb +++ b/lib/temporal/metadata.rb @@ -18,7 +18,7 @@ def generate_activity_metadata(task, namespace, converter) workflow_id: task.workflow_execution.workflow_id, workflow_name: task.workflow_type.name, headers: converter.from_payload_map(task.header&.fields || {}), - heartbeat_details: converter.from_details_payloads(task.heartbeat_details) + heartbeat_details: converter.from_details_payloads(task.heartbeat_details), scheduled_at: task.scheduled_time.to_time, current_attempt_scheduled_at: task.current_attempt_scheduled_time.to_time ) From 26673ed3b3c7ed5769a5ef811d0cc0e6518bb6c5 Mon Sep 17 00:00:00 2001 From: antstorm Date: Mon, 21 Mar 2022 17:12:09 +0000 Subject: [PATCH 14/16] Adopt explicit converter in newly published changes --- lib/temporal/workflow/execution_info.rb | 2 +- .../connection/serializer/start_child_workflow_spec.rb | 5 +++-- spec/unit/lib/temporal/workflow/execution_info_spec.rb | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/temporal/workflow/execution_info.rb b/lib/temporal/workflow/execution_info.rb index 63b2550a..7f7ccfed 100644 --- a/lib/temporal/workflow/execution_info.rb +++ b/lib/temporal/workflow/execution_info.rb @@ -14,7 +14,7 @@ class ExecutionInfo < Struct.new(:workflow, :workflow_id, :run_id, :start_time, ] def self.generate_from(response, converter) - search_attributes = response.search_attributes.nil? ? {} : self.from_payload_map(response.search_attributes.indexed_fields) + search_attributes = converter.from_payload_map(response.search_attributes&.indexed_fields || {}) new( workflow: response.type.name, diff --git a/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb b/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb index 5e3278b4..9f9cc839 100644 --- a/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb +++ b/spec/unit/lib/temporal/connection/serializer/start_child_workflow_spec.rb @@ -3,6 +3,7 @@ require 'temporal/connection/serializer/start_child_workflow' describe Temporal::Connection::Serializer::StartChildWorkflow do + let(:config) { Temporal::Configuration.new } let(:example_command) do Temporal::Workflow::Command::StartChildWorkflow.new( workflow_id: SecureRandom.uuid, @@ -23,7 +24,7 @@ command.parent_close_policy = :invalid expect do - described_class.new(command).to_proto + described_class.new(command, config.converter).to_proto end.to raise_error(Temporal::Connection::ArgumentError) do |e| expect(e.message).to eq("Unknown parent_close_policy '#{command.parent_close_policy}' specified") end @@ -39,7 +40,7 @@ command = example_command command.parent_close_policy = policy_name - result = described_class.new(command).to_proto + result = described_class.new(command, config.converter).to_proto attribs = result.start_child_workflow_execution_command_attributes expect(attribs.parent_close_policy).to eq(expected_parent_close_policy) end diff --git a/spec/unit/lib/temporal/workflow/execution_info_spec.rb b/spec/unit/lib/temporal/workflow/execution_info_spec.rb index 1ce42664..e693b8ac 100644 --- a/spec/unit/lib/temporal/workflow/execution_info_spec.rb +++ b/spec/unit/lib/temporal/workflow/execution_info_spec.rb @@ -25,7 +25,7 @@ it 'deserializes if search_attributes is nil' do api_info.search_attributes = nil - result = described_class.generate_from(api_info) + result = described_class.generate_from(api_info, config.converter) expect(result.search_attributes).to eq({}) end end From a14d911396f14c1d6150bd50eed9854081bf4985 Mon Sep 17 00:00:00 2001 From: antstorm Date: Mon, 21 Mar 2022 17:43:15 +0000 Subject: [PATCH 15/16] Fix crypt converter spec to avoid modification of the global config --- examples/spec/integration/converter_spec.rb | 45 +++++++++------------ 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/examples/spec/integration/converter_spec.rb b/examples/spec/integration/converter_spec.rb index 949888bb..dacd437a 100644 --- a/examples/spec/integration/converter_spec.rb +++ b/examples/spec/integration/converter_spec.rb @@ -3,68 +3,61 @@ require 'grpc/errors' describe 'Converter', :integration do - around(:each) do |example| - task_queue = Temporal.configuration.task_queue - - Temporal.configure do |config| + let(:config) do + Temporal.configuration.dup.tap do |config| config.task_queue = 'crypt' config.converter = Temporal::CryptConverter.new( payload_converter: Temporal::Configuration::DEFAULT_CONVERTER ) end - - example.run - ensure - Temporal.configure do |config| - config.task_queue = task_queue - config.converter = Temporal::Configuration::DEFAULT_CONVERTER - end end + let(:client) { Temporal::Client.new(config) } it 'can encrypt payloads' do - workflow_id, run_id = run_workflow(HelloWorldWorkflow, 'Tom') + workflow_id = SecureRandom.uuid + run_id = client.start_workflow(HelloWorldWorkflow, 'Tom', options: { workflow_id: workflow_id }) begin - wait_for_workflow_completion(workflow_id, run_id) + client.await_workflow_result(HelloWorldWorkflow, workflow_id: workflow_id, run_id: run_id) rescue GRPC::DeadlineExceeded raise "Encrypted-payload workflow didn't run. Make sure you run USE_ENCRYPTION=1 ./bin/worker and try again." end - result = fetch_history(workflow_id, run_id) + history = client.get_workflow_history(namespace: config.namespace, workflow_id: workflow_id, run_id: run_id) - events = result.history.events.group_by(&:event_type) + events = history.events.group_by(&:type) - events[:EVENT_TYPE_WORKFLOW_EXECUTION_STARTED].map do |event| - input = event.workflow_execution_started_event_attributes.input + events['WORKFLOW_EXECUTION_STARTED'].map do |event| + input = event.attributes.input input.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - events[:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED].map do |event| - input = event.activity_task_scheduled_event_attributes.input + events['ACTIVITY_TASK_SCHEDULED'].map do |event| + input = event.attributes.input input.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - events[:EVENT_TYPE_ACTIVITY_TASK_COMPLETED].map do |event| - result = event.activity_task_completed_event_attributes.result + events['ACTIVITY_TASK_COMPLETED'].map do |event| + result = event.attributes.result result.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].map do |event| - result = event.workflow_execution_completed_event_attributes.result + events['WORKFLOW_EXECUTION_COMPLETED'].map do |event| + result = event.attributes.result result.payloads.each do |payload| expect(payload.metadata['encoding']).to eq('binary/encrypted') end end - completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first - result = completion_event.workflow_execution_completed_event_attributes.result + completion_event = events['WORKFLOW_EXECUTION_COMPLETED'].first + result = completion_event.attributes.result - expect(Temporal.configuration.converter.from_payloads(result)&.first).to eq('Hello World, Tom') + expect(config.converter.from_payloads(result)&.first).to eq('Hello World, Tom') end end From 1c9a1de8969a38688177f86cfdcfd34a685894e9 Mon Sep 17 00:00:00 2001 From: antstorm Date: Thu, 24 Mar 2022 16:34:29 +0000 Subject: [PATCH 16/16] fixup! Cleanup spec fabricators from explicit references to Temporal.configuration.converter --- spec/fabricators/grpc/history_event_fabricator.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/fabricators/grpc/history_event_fabricator.rb b/spec/fabricators/grpc/history_event_fabricator.rb index 80ad087d..c27939a5 100644 --- a/spec/fabricators/grpc/history_event_fabricator.rb +++ b/spec/fabricators/grpc/history_event_fabricator.rb @@ -125,7 +125,7 @@ event_type { Temporal::Api::Enums::V1::EventType::EVENT_TYPE_ACTIVITY_TASK_CANCELED } activity_task_canceled_event_attributes do |attrs| Temporal::Api::History::V1::ActivityTaskCanceledEventAttributes.new( - details: Temporal::Configuration.new.converter.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), + details: $converter.to_details_payloads('ACTIVITY_ID_NOT_STARTED'), scheduled_event_id: attrs[:event_id] - 2, started_event_id: nil, identity: 'test-worker@test-host'