Skip to content

Commit

Permalink
Merge coinbase#73; update with client->connection refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
cvanderschuere committed Feb 14, 2022
2 parents 101ec13 + f984a3b commit 0f8d82c
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 14 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Temporal.configure do |config|
config.port = 7233
config.namespace = 'ruby-samples'
config.task_queue = 'hello-world'
config.credentials = :this_channel_is_insecure
end

begin
Expand Down Expand Up @@ -114,6 +115,24 @@ curl -O https://raw.githubusercontent.com/temporalio/docker-compose/main/docker-
docker-compose up
```

### Connecting via SSL

In many production deployments you will end up connecting to your Temporal Services via SSL. In which
case you must read the public cert of the CA that issued your Temporal server's SSL cert and create
an instance of GRPC Channel Credentials.

Configure your Temporal connection:

```ruby
Temporal.configure do |config|
config.host = 'temporal-prod.mycompany.com'
config.port = 7233
config.namespace = 'ruby-samples'
config.task_queue = 'hello-world'
config.credentials = GRPC::Core::ChannelCredentials.new(CA_CERT)
end
```

## Workflows

A workflow is defined using pure Ruby code, however it should contain only a high-level
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
timeout: timeout || max_timeout,
)
rescue GRPC::DeadlineExceeded => e
message = if timeout
message = if timeout
"Timed out after your specified limit of timeout: #{timeout} seconds"
else
"Timed out after #{max_timeout} seconds, which is the maximum supported amount."
Expand Down
7 changes: 4 additions & 3 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :credentials, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true)

attr_reader :timeouts, :error_handlers
attr_writer :converter
attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers
attr_accessor :connection_type, :host, :port, :credentials, :logger, :metrics_adapter, :namespace, :task_queue, :headers

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -79,7 +79,8 @@ def for_connection
Connection.new(
type: connection_type,
host: host,
port: port
port: port,
credentials: credentials,
).freeze
end

Expand Down
10 changes: 9 additions & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ def self.generate(configuration)
connection_class = CLIENT_TYPES_MAP[configuration.type]
host = configuration.host
port = configuration.port
credentials = :this_channel_is_insecure

unless configuration.credentials.nil?
credentials = configuration.credentials
end

hostname = `hostname`
thread_id = Thread.current.object_id
identity = "#{thread_id}@#{hostname}"

connection_class.new(host, port, identity)
if configuration.type == :grpc
connection_class.new(host, port, identity, credentials)
else
connection_class.new(host, port, identity)
end
end
end
9 changes: 5 additions & 4 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ class GRPC
close: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
}.freeze

def initialize(host, port, identity)
def initialize(host, port, identity, credentials)
@url = "#{host}:#{port}"
@identity = identity
@credentials = credentials
@poll = true
@poll_mutex = Mutex.new
@poll_request = nil
Expand Down Expand Up @@ -133,7 +134,7 @@ def get_workflow_execution_history(
event_type: :all,
timeout: nil
)
if wait_for_new_event
if wait_for_new_event
if timeout.nil?
# This is an internal error. Wrappers should enforce this.
raise "You must specify a timeout when wait_for_new_event = true."
Expand Down Expand Up @@ -464,12 +465,12 @@ def cancel_polling_request

private

attr_reader :url, :identity, :poll_mutex, :poll_request
attr_reader :url, :credentials, :identity, :poll_mutex, :poll_request

def client
@client ||= Temporal::Api::WorkflowService::V1::WorkflowService::Stub.new(
url,
:this_channel_is_insecure,
credentials,
timeout: 60
)
end
Expand Down
10 changes: 5 additions & 5 deletions spec/unit/lib/temporal/grpc_client_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
describe Temporal::Connection::GRPC do
subject { Temporal::Connection::GRPC.new(nil, nil, nil) }
subject { Temporal::Connection::GRPC.new(nil, nil, nil, :this_channel_is_insecure) }
let(:grpc_stub) { double('grpc stub') }
let(:namespace) { 'test-namespace' }
let(:workflow_id) { SecureRandom.uuid }
Expand All @@ -8,7 +8,7 @@

before do
allow(subject).to receive(:client).and_return(grpc_stub)

allow(Time).to receive(:now).and_return(now)
end

Expand All @@ -35,7 +35,7 @@
end
end
end

describe '#signal_with_start_workflow' do
let(:temporal_response) do
Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx')
Expand Down Expand Up @@ -122,7 +122,7 @@
end
end

it 'demands a timeout to be specified' do
it 'demands a timeout to be specified' do
expect do
subject.get_workflow_execution_history(
namespace: namespace,
Expand All @@ -135,7 +135,7 @@
end
end

it 'disallows a timeout larger than the server timeout' do
it 'disallows a timeout larger than the server timeout' do
expect do
subject.get_workflow_execution_history(
namespace: namespace,
Expand Down

0 comments on commit 0f8d82c

Please sign in to comment.