Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Remove deprecation warning #284

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
require 'workflows/call_failing_activity_workflow'

describe CallFailingActivityWorkflow, :integration do

class TestDeserializer
include Temporal::Concerns::Payloads
end

it 'correctly re-raises an activity-thrown exception in the workflow' do
workflow_id = SecureRandom.uuid
expected_message = "a failure message"
Expand Down
47 changes: 19 additions & 28 deletions examples/spec/integration/converter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,72 +3,63 @@
require 'grpc/errors'

describe 'Converter', :integration do
around(:each) do |example|
task_queue = Temporal.configuration.task_queue

Temporal.configure do |config|
let(:config) do
Temporal.configuration.dup.tap do |config|
config.task_queue = 'crypt'
config.payload_codec = Temporal::Connection::Converter::Codec::Chain.new(
payload_codecs: [
Temporal::CryptPayloadCodec.new
]
)
end

example.run
ensure
Temporal.configure do |config|
config.task_queue = task_queue
config.payload_codec = Temporal::Configuration::DEFAULT_PAYLOAD_CODEC
end
end
let(:client) { Temporal::Client.new(config) }

it 'can encrypt payloads' do
workflow_id, run_id = run_workflow(HelloWorldWorkflow, 'Tom')
workflow_id = SecureRandom.uuid
run_id = client.start_workflow(HelloWorldWorkflow, 'Tom', options: { workflow_id: workflow_id })

begin
wait_for_workflow_completion(workflow_id, run_id)
client.await_workflow_result(HelloWorldWorkflow, workflow_id: workflow_id, run_id: run_id)
rescue GRPC::DeadlineExceeded
raise "Encrypted-payload workflow didn't run. Make sure you run USE_ENCRYPTION=1 ./bin/worker and try again."
end

result = fetch_history(workflow_id, run_id)
history = client.get_workflow_history(namespace: config.namespace, workflow_id: workflow_id, run_id: run_id)

events = result.history.events.group_by(&:event_type)
events = history.events.group_by(&:type)

events[:EVENT_TYPE_WORKFLOW_EXECUTION_STARTED].map do |event|
input = event.workflow_execution_started_event_attributes.input
events['WORKFLOW_EXECUTION_STARTED'].map do |event|
input = event.attributes.input
input.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

events[:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED].map do |event|
input = event.activity_task_scheduled_event_attributes.input
events['ACTIVITY_TASK_SCHEDULED'].map do |event|
input = event.attributes.input
input.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

events[:EVENT_TYPE_ACTIVITY_TASK_COMPLETED].map do |event|
result = event.activity_task_completed_event_attributes.result
events['ACTIVITY_TASK_COMPLETED'].map do |event|
result = event.attributes.result
result.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].map do |event|
result = event.workflow_execution_completed_event_attributes.result
events['WORKFLOW_EXECUTION_COMPLETED'].map do |event|
result = event.attributes.result
result.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first
result = completion_event.workflow_execution_completed_event_attributes.result

payload_codec = Temporal.configuration.payload_codec
completion_event = events['WORKFLOW_EXECUTION_COMPLETED'].first
result = completion_event.attributes.result

expect(payload_codec.decodes(result).payloads.first.data).to eq('"Hello World, Tom"')
expect(config.converter.from_payloads(result)&.first).to eq('Hello World, Tom')
end
end
6 changes: 2 additions & 4 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,19 @@
require 'temporal/error_handler'
require 'temporal/errors'
require 'temporal/activity/context'
require 'temporal/concerns/payloads'
require 'temporal/connection/retryer'
require 'temporal/connection'
require 'temporal/metric_keys'

module Temporal
class Activity
class TaskProcessor
include Concerns::Payloads

def initialize(task, task_queue, namespace, activity_lookup, middleware_chain, config, heartbeat_thread_pool)
@task = task
@task_queue = task_queue
@namespace = namespace
@metadata = Metadata.generate_activity_metadata(task, namespace)
@metadata = Metadata.generate_activity_metadata(task, namespace, config.converter)
@task_token = task.task_token
@activity_name = task.activity_type.name
@activity_class = activity_lookup.find(activity_name)
Expand All @@ -38,7 +36,7 @@ def process
end

result = middleware_chain.invoke(metadata) do
activity_class.execute_in_context(context, from_payloads(task.input))
activity_class.execute_in_context(context, config.converter.from_payloads(task.input))
end

# Do not complete asynchronous activities, these should be completed manually
Expand Down
14 changes: 4 additions & 10 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,15 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
case closed_event.type
when 'WORKFLOW_EXECUTION_COMPLETED'
payloads = closed_event.attributes.result
return ResultConverter.from_result_payloads(payloads)
return config.converter.from_result_payloads(payloads)
when 'WORKFLOW_EXECUTION_TIMED_OUT'
raise Temporal::WorkflowTimedOut
when 'WORKFLOW_EXECUTION_TERMINATED'
raise Temporal::WorkflowTerminated
when 'WORKFLOW_EXECUTION_CANCELED'
raise Temporal::WorkflowCanceled
when 'WORKFLOW_EXECUTION_FAILED'
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure)
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure, config.converter)
when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW'
new_run_id = closed_event.attributes.new_execution_run_id
# Throw to let the caller know they're not getting the result
Expand Down Expand Up @@ -328,7 +328,7 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_
# for reference
# @param details [String, Array, nil] optional details to be stored in history
def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil)
namespace ||= Temporal.configuration.namespace
namespace ||= config.namespace

connection.terminate_workflow_execution(
namespace: namespace,
Expand All @@ -353,7 +353,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id)
run_id: run_id
)

Workflow::ExecutionInfo.generate_from(response.workflow_execution_info)
Workflow::ExecutionInfo.generate_from(response.workflow_execution_info, config.converter)
end

# Manually complete an activity
Expand Down Expand Up @@ -550,11 +550,6 @@ def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

class ResultConverter
extend Concerns::Payloads
end
private_constant :ResultConverter

private

attr_reader :config
Expand Down Expand Up @@ -596,6 +591,5 @@ def validate_filter(filter, *allowed_filters)

raise ArgumentError, 'Only one filter is allowed' if filter.size > 1
end

end
end
86 changes: 0 additions & 86 deletions lib/temporal/concerns/payloads.rb

This file was deleted.

17 changes: 14 additions & 3 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
require 'temporal/connection/converter/payload/proto_json'
require 'temporal/connection/converter/composite'
require 'temporal/connection/converter/codec/chain'
require 'temporal/converter_wrapper'

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :credentials, :identity, :converter, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)

attr_reader :timeouts, :error_handlers, :capabilities
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity,
attr_accessor :connection_type, :use_error_serialization_v2, :host, :port, :credentials, :identity,
:logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators,
:payload_codec, :legacy_signals, :no_signals_in_first_task

Expand Down Expand Up @@ -114,13 +115,23 @@ def timeouts=(new_timeouts)
@timeouts = DEFAULT_TIMEOUTS.merge(new_timeouts)
end

def converter
@converter_wrapper ||= ConverterWrapper.new(@converter, @payload_codec)
end

def converter=(new_converter)
@converter = new_converter
@converter_wrapper = nil
end

def for_connection
Connection.new(
type: connection_type,
host: host,
port: port,
credentials: credentials,
identity: identity || default_identity
identity: identity || default_identity,
converter: converter
).freeze
end

Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def self.generate(configuration)
credentials = configuration.credentials
identity = configuration.identity

connection_class.new(host, port, identity, credentials)
connection_class.new(host, port, identity, credentials, configuration.converter)
end
end
end
Loading
Loading