Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Remove deprecation warning #152

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
require 'workflows/call_failing_activity_workflow'

describe CallFailingActivityWorkflow, :integration do

class TestDeserializer
include Temporal::Concerns::Payloads
end

it 'correctly re-raises an activity-thrown exception in the workflow' do
workflow_id = SecureRandom.uuid
expected_message = "a failure message"
Expand Down
47 changes: 19 additions & 28 deletions examples/spec/integration/converter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,70 +3,61 @@
require 'grpc/errors'

describe 'Converter', :integration do
around(:each) do |example|
task_queue = Temporal.configuration.task_queue

Temporal.configure do |config|
let(:config) do
Temporal.configuration.dup.tap do |config|
config.task_queue = 'crypt'
config.converter = Temporal::CryptConverter.new(
payload_converter: Temporal::Configuration::DEFAULT_CONVERTER
)
end

example.run
ensure
Temporal.configure do |config|
config.task_queue = task_queue
config.converter = Temporal::Configuration::DEFAULT_CONVERTER
end
end
let(:client) { Temporal::Client.new(config) }

it 'can encrypt payloads' do
workflow_id, run_id = run_workflow(HelloWorldWorkflow, 'Tom')
workflow_id = SecureRandom.uuid
run_id = client.start_workflow(HelloWorldWorkflow, 'Tom', options: { workflow_id: workflow_id })

begin
wait_for_workflow_completion(workflow_id, run_id)
client.await_workflow_result(HelloWorldWorkflow, workflow_id: workflow_id, run_id: run_id)
rescue GRPC::DeadlineExceeded
raise "Encrypted-payload workflow didn't run. Make sure you run USE_ENCRYPTION=1 ./bin/worker and try again."
end

result = fetch_history(workflow_id, run_id)
history = client.get_workflow_history(namespace: config.namespace, workflow_id: workflow_id, run_id: run_id)

events = result.history.events.group_by(&:event_type)
events = history.events.group_by(&:type)

events[:EVENT_TYPE_WORKFLOW_EXECUTION_STARTED].map do |event|
input = event.workflow_execution_started_event_attributes.input
events['WORKFLOW_EXECUTION_STARTED'].map do |event|
input = event.attributes.input
input.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

events[:EVENT_TYPE_ACTIVITY_TASK_SCHEDULED].map do |event|
input = event.activity_task_scheduled_event_attributes.input
events['ACTIVITY_TASK_SCHEDULED'].map do |event|
input = event.attributes.input
input.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

events[:EVENT_TYPE_ACTIVITY_TASK_COMPLETED].map do |event|
result = event.activity_task_completed_event_attributes.result
events['ACTIVITY_TASK_COMPLETED'].map do |event|
result = event.attributes.result
result.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].map do |event|
result = event.workflow_execution_completed_event_attributes.result
events['WORKFLOW_EXECUTION_COMPLETED'].map do |event|
result = event.attributes.result
result.payloads.each do |payload|
expect(payload.metadata['encoding']).to eq('binary/encrypted')
end
end

completion_event = events[:EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED].first
result = completion_event.workflow_execution_completed_event_attributes.result

converter = Temporal.configuration.converter
completion_event = events['WORKFLOW_EXECUTION_COMPLETED'].first
result = completion_event.attributes.result

expect(converter.from_payloads(result)&.first).to eq('Hello World, Tom')
expect(config.converter.from_payloads(result)&.first).to eq('Hello World, Tom')
end
end
7 changes: 2 additions & 5 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
require 'temporal/error_handler'
require 'temporal/errors'
require 'temporal/activity/context'
require 'temporal/concerns/payloads'
require 'temporal/connection/retryer'
require 'temporal/connection'

module Temporal
class Activity
class TaskProcessor
include Concerns::Payloads

def initialize(task, namespace, activity_lookup, middleware_chain, config)
@task = task
@namespace = namespace
@metadata = Metadata.generate_activity_metadata(task, namespace)
@metadata = Metadata.generate_activity_metadata(task, namespace, config.converter)
@task_token = task.task_token
@activity_name = task.activity_type.name
@activity_class = activity_lookup.find(activity_name)
Expand All @@ -35,7 +32,7 @@ def process
end

result = middleware_chain.invoke(metadata) do
activity_class.execute_in_context(context, from_payloads(task.input))
activity_class.execute_in_context(context, config.converter.from_payloads(task.input))
end

# Do not complete asynchronous activities, these should be completed manually
Expand Down
15 changes: 5 additions & 10 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,15 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
case closed_event.type
when 'WORKFLOW_EXECUTION_COMPLETED'
payloads = closed_event.attributes.result
return ResultConverter.from_result_payloads(payloads)
return config.converter.from_result_payloads(payloads)
when 'WORKFLOW_EXECUTION_TIMED_OUT'
raise Temporal::WorkflowTimedOut
when 'WORKFLOW_EXECUTION_TERMINATED'
raise Temporal::WorkflowTerminated
when 'WORKFLOW_EXECUTION_CANCELED'
raise Temporal::WorkflowCanceled
when 'WORKFLOW_EXECUTION_FAILED'
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure)
raise Temporal::Workflow::Errors.generate_error(closed_event.attributes.failure, config.converter)
when 'WORKFLOW_EXECUTION_CONTINUED_AS_NEW'
new_run_id = closed_event.attributes.new_execution_run_id
# Throw to let the caller know they're not getting the result
Expand Down Expand Up @@ -284,7 +284,7 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_
# for reference
# @param details [String, Array, nil] optional details to be stored in history
def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil)
namespace ||= Temporal.configuration.namespace
namespace ||= config.namespace

connection.terminate_workflow_execution(
namespace: namespace,
Expand All @@ -309,7 +309,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id)
run_id: run_id
)

Workflow::ExecutionInfo.generate_from(response.workflow_execution_info)
Workflow::ExecutionInfo.generate_from(response.workflow_execution_info, config.converter)
end

# Manually complete an activity
Expand Down Expand Up @@ -375,11 +375,6 @@ def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {})
fetch_executions(:closed, { namespace: namespace, from: from, to: to }.merge(filter))
end

class ResultConverter
extend Concerns::Payloads
end
private_constant :ResultConverter

private

attr_reader :config
Expand Down Expand Up @@ -450,7 +445,7 @@ def fetch_executions(status, request_options)
end

executions.map do |raw_execution|
Temporal::Workflow::ExecutionInfo.generate_from(raw_execution)
Temporal::Workflow::ExecutionInfo.generate_from(raw_execution, config.converter)
end
end
end
Expand Down
59 changes: 0 additions & 59 deletions lib/temporal/concerns/payloads.rb

This file was deleted.

14 changes: 10 additions & 4 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
require 'temporal/connection/converter/payload/bytes'
require 'temporal/connection/converter/payload/json'
require 'temporal/connection/converter/composite'
require 'temporal/converter_wrapper'

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :converter, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true)

attr_reader :timeouts, :error_handlers
attr_writer :converter
attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
Expand Down Expand Up @@ -72,14 +72,20 @@ def timeouts=(new_timeouts)
end

def converter
@converter
@converter_wrapper ||= ConverterWrapper.new(@converter)
end

def converter=(new_converter)
@converter = new_converter
@converter_wrapper = nil
end

def for_connection
Connection.new(
type: connection_type,
host: host,
port: port
port: port,
converter: converter
).freeze
end

Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def self.generate(configuration)
thread_id = Thread.current.object_id
identity = "#{thread_id}@#{hostname}"

connection_class.new(host, port, identity)
connection_class.new(host, port, identity, configuration.converter)
end
end
end