Skip to content

Commit

Permalink
Add option for gRPC client connection retries (#270)
Browse files Browse the repository at this point in the history
* Allow passing channel args to GRPC connection

* Add config.connection_options hash

* Add option for client grpc connection retries

* Allow passing custom gRPC retry policy
  • Loading branch information
hughevans authored Feb 5, 2024
1 parent 3e0dae7 commit 65dfdb0
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 6 deletions.
8 changes: 5 additions & 3 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, :credentials, :identity, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :credentials, :identity, :connection_options, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, :search_attributes, keyword_init: true)

attr_reader :timeouts, :error_handlers, :capabilities
attr_accessor :connection_type, :converter, :use_error_serialization_v2, :host, :port, :credentials, :identity,
:logger, :metrics_adapter, :namespace, :task_queue, :headers, :search_attributes, :header_propagators,
:payload_codec, :legacy_signals, :no_signals_in_first_task
:payload_codec, :legacy_signals, :no_signals_in_first_task, :connection_options

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -84,6 +84,7 @@ def initialize
@search_attributes = {}
@header_propagators = []
@capabilities = Capabilities.new(self)
@connection_options = {}

# Signals previously were incorrectly replayed in order within a workflow task window, rather
# than at the beginning. Correcting this changes the determinism of any workflow with signals.
Expand Down Expand Up @@ -120,7 +121,8 @@ def for_connection
host: host,
port: port,
credentials: credentials,
identity: identity || default_identity
identity: identity || default_identity,
connection_options: connection_options
).freeze
end

Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ def self.generate(configuration)
port = configuration.port
credentials = configuration.credentials
identity = configuration.identity
options = configuration.connection_options

connection_class.new(host, port, identity, credentials)
connection_class.new(host, port, identity, credentials, options)
end
end
end
39 changes: 37 additions & 2 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'time'
require 'google/protobuf/well_known_types'
require 'securerandom'
require 'json'
require 'gen/temporal/api/filter/v1/message_pb'
require 'gen/temporal/api/workflowservice/v1/service_services_pb'
require 'gen/temporal/api/operatorservice/v1/service_services_pb'
Expand Down Expand Up @@ -795,11 +796,45 @@ def pause_schedule(namespace:, schedule_id:, should_pause:, note: nil)
attr_reader :url, :identity, :credentials, :options, :poll_mutex, :poll_request

def client
@client ||= Temporalio::Api::WorkflowService::V1::WorkflowService::Stub.new(
return @client if @client

channel_args = {}

if options[:keepalive_time_ms]
channel_args["grpc.keepalive_time_ms"] = options[:keepalive_time_ms]
end

if options[:retry_connection] || options[:retry_policy]
channel_args["grpc.enable_retries"] = 1

retry_policy = options[:retry_policy] || {
retryableStatusCodes: ["UNAVAILABLE"],
maxAttempts: 3,
initialBackoff: "0.1s",
backoffMultiplier: 2.0,
maxBackoff: "0.3s"
}

channel_args["grpc.service_config"] = ::JSON.generate(
methodConfig: [
{
name: [
{
service: "temporal.api.workflowservice.v1.WorkflowService",
}
],
retryPolicy: retry_policy
}
]
)
end

@client = Temporalio::Api::WorkflowService::V1::WorkflowService::Stub.new(
url,
credentials,
timeout: CONNECTION_TIMEOUT_SECONDS,
interceptors: [ClientNameVersionInterceptor.new]
interceptors: [ClientNameVersionInterceptor.new],
channel_args: channel_args
)
end

Expand Down
97 changes: 97 additions & 0 deletions spec/unit/lib/temporal/grpc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -830,4 +830,101 @@ class TestDeserializer
end
end
end

describe "passing in options" do
before do
allow(subject).to receive(:client).and_call_original
end

context "when keepalive_time_ms is passed" do
subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, keepalive_time_ms: 30_000) }

it "passes the option to the channel args" do
expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with(
":",
:this_channel_is_insecure,
timeout: 60,
interceptors: [instance_of(Temporal::Connection::ClientNameVersionInterceptor)],
channel_args: {
"grpc.keepalive_time_ms" => 30_000
}
)
subject.send(:client)
end
end

context "when passing retry_connection" do
subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, retry_connection: true) }

it "passes the option to the channel args" do
expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with(
":",
:this_channel_is_insecure,
timeout: 60,
interceptors: [instance_of(Temporal::Connection::ClientNameVersionInterceptor)],
channel_args: {
"grpc.enable_retries" => 1,
"grpc.service_config" => {
methodConfig: [
{
name: [
{
service: "temporal.api.workflowservice.v1.WorkflowService",
}
],
retryPolicy: {
retryableStatusCodes: ["UNAVAILABLE"],
maxAttempts: 3,
initialBackoff: "0.1s",
backoffMultiplier: 2.0,
maxBackoff: "0.3s"
}
}
]
}.to_json
}
)
subject.send(:client)
end
end

context "when passing a custom retry policy" do
subject { Temporal::Connection::GRPC.new(nil, nil, identity, :this_channel_is_insecure, retry_policy: retry_policy) }

let(:retry_policy) do
{
retryableStatusCodes: ["UNAVAILABLE", "INTERNAL"],
maxAttempts: 1,
initialBackoff: "0.2s",
backoffMultiplier: 1.0,
maxBackoff: "0.5s"
}
end

it "passes the policy to the channel args" do
expect(Temporalio::Api::WorkflowService::V1::WorkflowService::Stub).to receive(:new).with(
":",
:this_channel_is_insecure,
timeout: 60,
interceptors: [instance_of(Temporal::Connection::ClientNameVersionInterceptor)],
channel_args: {
"grpc.enable_retries" => 1,
"grpc.service_config" => {
methodConfig: [
{
name: [
{
service: "temporal.api.workflowservice.v1.WorkflowService",
}
],
retryPolicy: retry_policy
}
]
}.to_json
}
)
subject.send(:client)
end
end
end
end

0 comments on commit 65dfdb0

Please sign in to comment.