-
Notifications
You must be signed in to change notification settings - Fork 481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Cdo::Throttle module #38142
Add Cdo::Throttle module #38142
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,45 @@ | ||||||||||
require 'cdo/shared_cache' | ||||||||||
require 'dynamic_config/dcdo' | ||||||||||
|
||||||||||
module Cdo | ||||||||||
module Throttle | ||||||||||
CACHE_PREFIX = "cdo_throttle/".freeze | ||||||||||
|
||||||||||
# @param [String] id - Unique identifier to throttle on. | ||||||||||
# @param [Integer] limit - Number of requests allowed over period. | ||||||||||
# @param [Integer] period - Period of time in seconds. | ||||||||||
# @param [Integer] throttle_for - How long id should stay throttled in seconds. Optional. | ||||||||||
# Defaults to Cdo::Throttle.throttle_time. | ||||||||||
# @returns [Boolean] Whether or not the request should be throttled. | ||||||||||
def self.throttle(id, limit, period, throttle_for = throttle_time) | ||||||||||
full_key = CACHE_PREFIX + id.to_s | ||||||||||
value = CDO.shared_cache.read(full_key) || empty_value | ||||||||||
now = Time.now.utc | ||||||||||
value[:request_timestamps] << now | ||||||||||
|
||||||||||
if value[:throttled_until]&.future? | ||||||||||
should_throttle = true | ||||||||||
else | ||||||||||
value[:throttled_until] = nil | ||||||||||
earliest = now - period | ||||||||||
value[:request_timestamps].select! {|timestamp| timestamp >= earliest} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm slightly worried about this. What kind of limit values do you think we will see in practice? Throttling functions typically need to be really fast and this design grows linearly in time and space. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, this is the part that i feel weird about as well. the limit values are currently 100 requests / 60 seconds for identifiable users and 1000 requests / 60 seconds for unidentifiable users:
the limits are configurable via DCDO values for each consumer. when a consumer is throttled, i am logging to honeybadger so we're notified and know if we need to adjust those limits There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you have any thoughts for how to improve this? i'm trying to keep this array of timestamps small by evicting any values outside of the period (which is currently 60 seconds for all consumers) to mitigate, but there may be a better way to do this (or to track the requests differently to avoid this problem entirely) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One way to solve this is to track counts in buckets instead of the exact timestamps. Each bucket would represent the number of requests that arrived during a particular window (say, 5 seconds) and you could sum up enough buckets to get the count for the interval that you want. Old buckets would automatically age out of the interval as time progresses. (Sorry if that's not super clear, this might be easier to explain with a sketch if it doesn't make sense.) |
||||||||||
should_throttle = value[:request_timestamps].size > limit | ||||||||||
value[:throttled_until] = now + throttle_for if should_throttle | ||||||||||
end | ||||||||||
|
||||||||||
CDO.shared_cache.write(full_key, value) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a race condition here if several different processes are all reading from and writing to the value at the same time which will undercount the number of requests. (That might be acceptable given the use case, but it would be a good limitation to note.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, yeah, you're right. i think it's acceptable for the current consumers (the client is caching responses and the server is doing some caching as well, so this is only called for unique/uncached requests), but i'd like to fix this to future-proof it. i think the solution for this would be to implement a mutex or a queue here, but are there other/better solutions? i worry about a mutex because i think it would have to lock for everybody (rather than being able to lock per ID), but maybe that's okay -- i'm not super familiar with implementing/using locks in this way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're using Redis, then this could be a good way to implement it: https://redislabs.com/redis-best-practices/basic-rate-limiting/. (The behavior would be a bit different as you've defined it here though.) A local lock probably wouldn't help because it's still local to an instance and a distributed lock is difficult to implement. Two viable patterns are atomic operations (such as increment) at the cache server or optimistic concurrency (where you make sure the value that you're writing hasn't changed since you read it). |
||||||||||
should_throttle | ||||||||||
end | ||||||||||
|
||||||||||
def self.empty_value | ||||||||||
{ | ||||||||||
throttled_until: nil, | ||||||||||
request_timestamps: [] | ||||||||||
} | ||||||||||
end | ||||||||||
|
||||||||||
def self.throttle_time | ||||||||||
DCDO.get('throttle_time_default', 60) | ||||||||||
end | ||||||||||
end | ||||||||||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
require_relative '../test_helper' | ||
require 'cdo/throttle' | ||
require 'timecop' | ||
|
||
class ThrottleTest < Minitest::Test | ||
def teardown | ||
CDO.shared_cache.clear | ||
end | ||
|
||
def test_throttle_with_limit_1 | ||
Timecop.freeze | ||
refute Cdo::Throttle.throttle("my_key", 1, 2) # 1/1 reqs per 2s - not throttled | ||
Timecop.travel(Time.now.utc + 1) | ||
assert Cdo::Throttle.throttle("my_key", 1, 2) # 2/1 reqs per 2s - throttled | ||
Timecop.travel(Time.now.utc + Cdo::Throttle.throttle_time - 1) | ||
assert Cdo::Throttle.throttle("my_key", 1, 2) # still throttled | ||
Timecop.travel(Time.now.utc + Cdo::Throttle.throttle_time) | ||
refute Cdo::Throttle.throttle("my_key", 1, 2) # 1/1 reqs per 2s after waiting - not throttled anymore | ||
Timecop.travel(Time.now.utc + 1) | ||
assert Cdo::Throttle.throttle("my_key", 1, 2) # 2/1 reqs per 2s - throttled again | ||
end | ||
|
||
def test_throttle_with_limit_greater_than_1 | ||
Timecop.freeze | ||
refute Cdo::Throttle.throttle("my_key", 2, 2) # 1/2 reqs per 2s - not throttled | ||
Timecop.travel(Time.now.utc + 1) | ||
refute Cdo::Throttle.throttle("my_key", 2, 2) # 2/2 reqs per 2s - not throttled | ||
Timecop.travel(Time.now.utc + 0.5) | ||
assert Cdo::Throttle.throttle("my_key", 2, 2) # 3/2 reqs per 2s - throttled | ||
Timecop.travel(Time.now.utc + Cdo::Throttle.throttle_time) | ||
refute Cdo::Throttle.throttle("my_key", 2, 2) # 1/2 reqs per 2s after waiting - not throttled anymore | ||
Timecop.travel(Time.now.utc + 1) | ||
refute Cdo::Throttle.throttle("my_key", 2, 2) # 2/2 reqs per 2s - not throttled | ||
Timecop.travel(Time.now.utc + 0.5) | ||
assert Cdo::Throttle.throttle("my_key", 2, 2) # 3/2 reqs per 2s - throttled again | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this result in a network call? How is failure handled (e.g. if the shared cache is down)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this will result in a network call if
CDO.shared_cache
is using ElastiCache (it's the default, but falls back to a FileStore cache if initializing ElastiCache fails here).if i'm reading the implementation correctly,
nil
will be returned on failure. this is rails' implementation ofread
, which calls the subclass' implementation ofread_entry
, which would be this implementation when using ElastiCache.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want to make this more explicit and just bypass the throttler if elasticache is temporarily unavailable?