Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for server-side rate limiting of activities #46

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def poll_loop
end

def poll_for_task
client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue, max_tasks_per_second: options[:max_tasks_per_second])
rescue StandardError => error
Temporal.logger.error("Unable to poll activity task queue: #{error.inspect}")
nil
Expand Down
5 changes: 4 additions & 1 deletion lib/temporal/client/grpc_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ def respond_workflow_task_failed(task_token:, cause:, exception: nil)
client.respond_workflow_task_failed(request)
end

def poll_activity_task_queue(namespace:, task_queue:)
def poll_activity_task_queue(namespace:, task_queue:, max_tasks_per_second: nil)
request = Temporal::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new(
identity: identity,
namespace: namespace,
task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
name: task_queue
),
task_queue_meta: Temporal::Api::TaskQueue::V1::TaskQueueMetadata.new(
max_tasks_per_second: max_tasks_per_second
)
)

Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Temporal
class Worker
# activity_thread_pool_size: number of threads that the poller can use to run activities.
# can be set to 1 if you want no paralellism in your activities, at the cost of throughput.
def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size])
def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], activity_max_tasks_per_second: nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a hash of options? There are quite a few options in the Go SDK for worker configuration that may be eventually added here. Changing this later would be a breaking change.

Both Java and Go SDKs name this "max task queue activities per second". Admittedly that does not match the field name in the proto. Because there is a distinction made between this limit and "max worker activities per second" too, it might make sense to add both options with explanatory comments while you're already in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this to an options hash later would not break the API. That would be backward compatible with the current kwargs. Unless you mean doing initialize(options: { ... }) but I can't see what the value is in doing that? Happy to change the option name to be closer to the Go SDK one. I don't think we should add the other option as part of this PR though, it would require adding code to do the rate limiting that isn't related to the server side rate limiting.

@workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
@activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new }
@pollers = []
Expand All @@ -18,6 +18,7 @@ def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OP
@shutting_down = false
@activity_poller_options = {
thread_pool_size: activity_thread_pool_size,
max_tasks_per_second: activity_max_tasks_per_second
}
end

Expand Down
2 changes: 1 addition & 1 deletion spec/unit/lib/temporal/activity/poller_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

expect(client)
.to have_received(:poll_activity_task_queue)
.with(namespace: namespace, task_queue: task_queue)
.with(namespace: namespace, task_queue: task_queue, max_tasks_per_second: nil)
.twice
end

Expand Down
24 changes: 20 additions & 4 deletions spec/unit/lib/temporal/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,12 @@ class TestWorkerActivity < Temporal::Activity

allow(Temporal::Activity::Poller)
.to receive(:new)
.with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 20})
.with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 20})
.and_return(activity_poller_1)

allow(Temporal::Activity::Poller)
.to receive(:new)
.with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 20})
.with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 20})
.and_return(activity_poller_2)

subject.register_workflow(TestWorkerWorkflow)
Expand All @@ -169,7 +169,7 @@ class TestWorkerActivity < Temporal::Activity
activity_poller = instance_double(Temporal::Activity::Poller, start: nil)
expect(Temporal::Activity::Poller)
.to receive(:new)
.with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 10})
.with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 10})
.and_return(activity_poller)

worker = Temporal::Worker.new(activity_thread_pool_size: 10)
Expand All @@ -180,7 +180,23 @@ class TestWorkerActivity < Temporal::Activity
worker.start

expect(activity_poller).to have_received(:start)
end

it 'can have an activity poller which throttles tasks per second' do
activity_poller = instance_double(Temporal::Activity::Poller, start: nil)
expect(Temporal::Activity::Poller)
.to receive(:new)
.with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: 15, thread_pool_size: 20})
.and_return(activity_poller)

worker = Temporal::Worker.new(activity_max_tasks_per_second: 15)
allow(worker).to receive(:shutting_down?).and_return(true)
worker.register_workflow(TestWorkerWorkflow)
worker.register_activity(TestWorkerActivity)

worker.start

expect(activity_poller).to have_received(:start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm not sure this check is needed in this spec, we're only checking that the Poller is initialised with the new option

end

context 'when middleware is configured' do
Expand Down Expand Up @@ -212,7 +228,7 @@ class TestWorkerActivity < Temporal::Activity

allow(Temporal::Activity::Poller)
.to receive(:new)
.with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [entry_2], thread_pool_size: 20)
.with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [entry_2], max_tasks_per_second: nil, thread_pool_size: 20)
.and_return(activity_poller_1)

subject.register_workflow(TestWorkerWorkflow)
Expand Down