diff --git a/Gemfile b/Gemfile index a65cdd918be..f9f49de3a3c 100644 --- a/Gemfile +++ b/Gemfile @@ -19,6 +19,7 @@ gem 'kubeclient' gem 'loggregator_emitter', '~> 5.0' gem 'membrane', '~> 1.0' gem 'mime-types', '~> 3.4' +gem 'mock_redis' gem 'multi_json' gem 'multipart-parser' gem 'net-ssh' @@ -31,6 +32,7 @@ gem 'posix-spawn', '~> 0.3.15' gem 'public_suffix' gem 'psych', '>= 4.0.4' gem 'rake' +gem 'redis' gem 'retryable' gem 'rfc822' gem 'rubyzip', '>= 1.3.0' diff --git a/Gemfile.lock b/Gemfile.lock index 523271ac550..5b8467ef214 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -119,6 +119,7 @@ GEM simplecov (<= 0.13) coderay (1.1.3) concurrent-ruby (1.2.2) + connection_pool (2.3.0) cookiejar (0.3.3) crack (0.4.5) rexml @@ -308,6 +309,8 @@ GEM mini_mime (1.1.2) mini_portile2 (2.8.2) minitest (5.18.1) + mock_redis (0.34.0) + ruby2_keywords ms_rest (0.6.4) concurrent-ruby (~> 1.0) faraday (~> 0.9) @@ -385,6 +388,10 @@ GEM ffi (~> 1.0) rbs (2.8.4) recursive-open-struct (1.1.3) + redis (5.0.5) + redis-client (>= 0.9.0) + redis-client (0.11.1) + connection_pool regexp_parser (2.8.1) representable (3.2.0) declarative (< 0.1.0) @@ -580,6 +587,7 @@ DEPENDENCIES machinist (~> 1.0.6) membrane (~> 1.0) mime-types (~> 3.4) + mock_redis multi_json multipart-parser mysql2 (~> 0.5.5) @@ -600,6 +608,7 @@ DEPENDENCIES rack-test railties (~> 6.1.7, >= 6.1.7.3) rake + redis retryable rfc822 roodi diff --git a/lib/cloud_controller/config_schemas/base/api_schema.rb b/lib/cloud_controller/config_schemas/base/api_schema.rb index 35e751011ba..0fa09d26ae6 100644 --- a/lib/cloud_controller/config_schemas/base/api_schema.rb +++ b/lib/cloud_controller/config_schemas/base/api_schema.rb @@ -97,6 +97,10 @@ class ApiSchema < VCAP::Config optional(:ca_cert_path) => String, }, + optional(:redis) => { + socket: String + }, + staging: { timeout_in_seconds: Integer, minimum_staging_memory_mb: Integer, diff --git a/middleware/base_rate_limiter.rb b/middleware/base_rate_limiter.rb index 0ef52c451dc..10cce1b15f6 100644 --- a/middleware/base_rate_limiter.rb +++ b/middleware/base_rate_limiter.rb @@ -1,30 +1,72 @@ require 'mixins/client_ip' require 'mixins/user_reset_interval' +require 'redis' module CloudFoundry module Middleware class ExpiringRequestCounter include CloudFoundry::Middleware::UserResetInterval - Counter = Struct.new(:value, :expires_at) - - def initialize(key_prefix) + def initialize(key_prefix, redis_connection_pool_size: nil) @key_prefix = key_prefix - @mutex = Mutex.new - @data = {} + @redis_connection_pool_size = redis_connection_pool_size end def increment(user_guid, reset_interval_in_minutes, logger) key = "#{@key_prefix}:#{user_guid}" expires_in = next_expires_in(user_guid, reset_interval_in_minutes) - @mutex.synchronize do - if !@data.key?(key) || (ttl = @data[key].expires_at - Time.now.to_i) <= 0 # not existing or expired - @data[key] = Counter.new(1, Time.now.to_i + expires_in) - [1, expires_in] - else - [@data[key].value += 1, ttl] + store.increment(key, expires_in, logger) + end + + private + + def store + return @store if defined?(@store) + + redis_socket = VCAP::CloudController::Config.config.get(:redis, :socket) + @store = redis_socket.nil? ? InMemoryStore.new : RedisStore.new(redis_socket, @redis_connection_pool_size) + end + + class InMemoryStore + Counter = Struct.new(:value, :expires_at) + + def initialize + @mutex = Mutex.new + @data = {} + end + + def increment(key, expires_in, _) + @mutex.synchronize do + if !@data.key?(key) || (ttl = @data[key].expires_at - Time.now.to_i) <= 0 # not existing or expired + @data[key] = Counter.new(1, Time.now.to_i + expires_in) + [1, expires_in] + else + [@data[key].value += 1, ttl] + end + end + end + end + + class RedisStore + def initialize(socket, connection_pool_size) + connection_pool_size ||= VCAP::CloudController::Config.config.get(:puma, :max_threads) || 1 + @redis = ConnectionPool::Wrapper.new(size: connection_pool_size) do + Redis.new(timeout: 1, path: socket) end end + + def increment(key, expires_in, logger) + _, count_str, ttl_int = @redis.multi do |transaction| + transaction.set(key, 0, ex: expires_in, nx: true) # nx => set only if not exists + transaction.incr(key) + transaction.ttl(key) + end + + [count_str.to_i, ttl_int] + rescue Redis::BaseError => e + logger.error("Redis error: #{e.inspect}") + [1, expires_in] + end end end diff --git a/middleware/service_broker_rate_limiter.rb b/middleware/service_broker_rate_limiter.rb index 18a74b5e1bd..e45838a66c6 100644 --- a/middleware/service_broker_rate_limiter.rb +++ b/middleware/service_broker_rate_limiter.rb @@ -1,26 +1,77 @@ require 'concurrent-ruby' +require 'redis' module CloudFoundry module Middleware class ConcurrentRequestCounter - def initialize(key_prefix) + def initialize(key_prefix, redis_connection_pool_size: nil) @key_prefix = key_prefix - @mutex = Mutex.new - @data = {} + @redis_connection_pool_size = redis_connection_pool_size end def try_increment?(user_guid, max_concurrent_requests, logger) key = "#{@key_prefix}:#{user_guid}" - @mutex.synchronize do - @data[key] = Concurrent::Semaphore.new(max_concurrent_requests) unless @data.key?(key) - @data[key].try_acquire - end + store.try_increment?(key, max_concurrent_requests, logger) end def decrement(user_guid, logger) key = "#{@key_prefix}:#{user_guid}" - @mutex.synchronize do - @data[key].release if @data.key?(key) + store.decrement(key, logger) + end + + private + + def store + return @store if defined?(@store) + + redis_socket = VCAP::CloudController::Config.config.get(:redis, :socket) + @store = redis_socket.nil? ? InMemoryStore.new : RedisStore.new(redis_socket, @redis_connection_pool_size) + end + + class InMemoryStore + def initialize + @mutex = Mutex.new + @data = {} + end + + def try_increment?(key, max_concurrent_requests, _) + @mutex.synchronize do + @data[key] = Concurrent::Semaphore.new(max_concurrent_requests) unless @data.key?(key) + @data[key].try_acquire + end + end + + def decrement(key, _) + @mutex.synchronize do + @data[key].release if @data.key?(key) + end + end + end + + class RedisStore + def initialize(socket, connection_pool_size) + connection_pool_size ||= VCAP::CloudController::Config.config.get(:puma, :max_threads) || 1 + @redis = ConnectionPool::Wrapper.new(size: connection_pool_size) do + Redis.new(timeout: 1, path: socket) + end + end + + def try_increment?(key, max_concurrent_requests, logger) + count_str = @redis.incr(key) + return true if count_str.to_i <= max_concurrent_requests + + @redis.decr(key) + false + rescue Redis::BaseError => e + logger.error("Redis error: #{e.inspect}") + true + end + + def decrement(key, logger) + count_str = @redis.decr(key) + @redis.incr(key) if count_str.to_i < 0 + rescue Redis::BaseError => e + logger.error("Redis error: #{e.inspect}") end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c88c5749539..85a5ed6726d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,5 +1,6 @@ SPEC_HELPER_LOADED = true require 'rubygems' +require 'mock_redis' begin require 'spork' @@ -169,6 +170,9 @@ VCAP::CloudController::SecurityContext.clear allow_any_instance_of(VCAP::CloudController::UaaTokenDecoder).to receive(:uaa_issuer).and_return(UAAIssuer::ISSUER) + + mock_redis = MockRedis.new + allow(Redis).to receive(:new).and_return(mock_redis) end rspec_config.around :each do |example| diff --git a/spec/unit/middleware/rate_limiter_spec.rb b/spec/unit/middleware/rate_limiter_spec.rb index d2860e95b1d..488076559d1 100644 --- a/spec/unit/middleware/rate_limiter_spec.rb +++ b/spec/unit/middleware/rate_limiter_spec.rb @@ -323,42 +323,98 @@ module Middleware end RSpec.describe ExpiringRequestCounter do - let(:expiring_request_counter) { ExpiringRequestCounter.new('test') } - let(:stubbed_expires_in) { 30.minutes.to_i } - let(:user_guid) { SecureRandom.uuid } - let(:reset_interval_in_minutes) { 60 } - let(:logger) { double('logger') } - - before do - allow(expiring_request_counter).to receive(:next_expires_in).and_return(stubbed_expires_in) - end + store_implementations = [] + store_implementations << :in_memory # Test the ExpiringRequestCounter::InMemoryStore + store_implementations << :mock_redis # Test the ExpiringRequestCounter::RedisStore with MockRedis + + store_implementations.each do |store_implementation| + describe store_implementation do + let(:expiring_request_counter) { ExpiringRequestCounter.new('test') } + let(:stubbed_expires_in) { 30.minutes.to_i } + let(:store) { expiring_request_counter.instance_variable_get(:@store) } + let(:user_guid) { SecureRandom.uuid } + let(:reset_interval_in_minutes) { 60 } + let(:logger) { double('logger') } - describe '#initialize' do - it 'sets the @key_prefix' do - expect(expiring_request_counter.instance_variable_get(:@key_prefix)).to eq('test') - end - end + before do + TestConfig.override(redis: { socket: 'foo' }, puma: { max_threads: 123 }) unless store_implementation == :in_memory - describe '#increment' do - it 'calls next_expires_in with the given user guid and reset interval' do - expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) - expect(expiring_request_counter).to have_received(:next_expires_in).with(user_guid, reset_interval_in_minutes) - end + allow(ConnectionPool::Wrapper).to receive(:new).and_call_original + expiring_request_counter.send(:store) # instantiate counter and store implementation + allow(expiring_request_counter).to receive(:next_expires_in).and_return(stubbed_expires_in) + end - it 'returns count=1 and expires_in for a new user' do - count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) - expect(count).to eq(1) - expect(expires_in).to eq(stubbed_expires_in) - end + describe '#initialize' do + it 'sets the @key_prefix' do + expect(expiring_request_counter.instance_variable_get(:@key_prefix)).to eq('test') + end - it 'returns count=2 and expires_in minus the elapsed time for a recurring user' do - expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) + it 'instantiates the appropriate store class' do + if store_implementation == :in_memory + expect(store).to be_kind_of(ExpiringRequestCounter::InMemoryStore) + else + expect(store).to be_kind_of(ExpiringRequestCounter::RedisStore) + end + end + + it 'uses a connection pool size that equals the maximum puma threads' do + skip('Not relevant for InMemoryStore') if store_implementation == :in_memory + + expect(ConnectionPool::Wrapper).to have_received(:new).with(size: 123) + end - elapsed_seconds = 10 - Timecop.travel(Time.now + elapsed_seconds.seconds) do - count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) - expect(count).to eq(2) - expect(expires_in).to eq(stubbed_expires_in - elapsed_seconds) + context 'with custom connection pool size' do + let(:expiring_request_counter) { ExpiringRequestCounter.new('test', redis_connection_pool_size: 456) } + + it 'uses the provided connection pool size' do + skip('Not relevant for InMemoryStore') if store_implementation == :in_memory + + expect(ConnectionPool::Wrapper).to have_received(:new).with(size: 456) + end + end + end + + describe '#increment' do + it 'calls next_expires_in with the given user guid and reset interval' do + expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) + expect(expiring_request_counter).to have_received(:next_expires_in).with(user_guid, reset_interval_in_minutes) + end + + it 'calls @store.increment with the prefixed user guid, stubbed expires_in and given logger' do + allow(store).to receive(:increment).and_call_original + expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) + expect(store).to have_received(:increment).with("test:#{user_guid}", stubbed_expires_in, logger) + end + + it 'returns count=1 and expires_in for a new user' do + count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) + expect(count).to eq(1) + expect(expires_in).to eq(stubbed_expires_in) + end + + it 'returns count=2 and expires_in minus the elapsed time for a recurring user' do + expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) + + elapsed_seconds = 10 + Timecop.travel(Time.now + elapsed_seconds.seconds) do + count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) + expect(count).to eq(2) + expect(expires_in).to eq(stubbed_expires_in - elapsed_seconds) + end + end + + it 'returns count=1 and expires_in in case of a Redis error' do + skip('Not relevant for InMemoryStore') if store_implementation == :in_memory + + allow_any_instance_of(MockRedis::TransactionWrapper).to receive(:multi).and_raise(Redis::ConnectionError) + allow_any_instance_of(Redis).to receive(:multi).and_raise(Redis::ConnectionError) + allow(logger).to receive(:error) + + count, expires_in = expiring_request_counter.increment(user_guid, reset_interval_in_minutes, logger) + expect(count).to eq(1) + expect(expires_in).to eq(stubbed_expires_in) + expect(logger).to have_received(:error).with(/Redis error/) + end end end end diff --git a/spec/unit/middleware/service_broker_rate_limiter_spec.rb b/spec/unit/middleware/service_broker_rate_limiter_spec.rb index 1cc065bbadf..265fde1dde3 100644 --- a/spec/unit/middleware/service_broker_rate_limiter_spec.rb +++ b/spec/unit/middleware/service_broker_rate_limiter_spec.rb @@ -156,54 +156,124 @@ module Middleware end RSpec.describe ConcurrentRequestCounter do - let(:concurrent_request_counter) { ConcurrentRequestCounter.new('test') } - let(:user_guid) { SecureRandom.uuid } - let(:max_concurrent_requests) { 5 } - let(:logger) { double('logger', info: nil) } + store_implementations = [] + store_implementations << :in_memory # Test the ConcurrentRequestCounter::InMemoryStore + store_implementations << :mock_redis # Test the ConcurrentRequestCounter::RedisStore with MockRedis + + store_implementations.each do |store_implementation| + describe store_implementation do + let(:concurrent_request_counter) { ConcurrentRequestCounter.new('test') } + let(:store) { concurrent_request_counter.instance_variable_get(:@store) } + let(:user_guid) { SecureRandom.uuid } + let(:max_concurrent_requests) { 5 } + let(:logger) { double('logger', info: nil) } - before do - concurrent_request_counter # instantiate counter - end + before do + TestConfig.override(redis: { socket: 'foo' }, puma: { max_threads: 123 }) unless store_implementation == :in_memory - describe '#initialize' do - it 'sets the @key_prefix' do - expect(concurrent_request_counter.instance_variable_get(:@key_prefix)).to eq('test') - end - end + allow(ConnectionPool::Wrapper).to receive(:new).and_call_original + concurrent_request_counter.send(:store) # instantiate counter and store implementation + end - describe '#try_increment?' do - it 'returns true for a new user' do - expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy - end + describe '#initialize' do + it 'sets the @key_prefix' do + expect(concurrent_request_counter.instance_variable_get(:@key_prefix)).to eq('test') + end - it 'returns true for a recurring user performing the maximum allowed concurrent requests' do - (max_concurrent_requests - 1).times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } - expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy - end + it 'instantiates the appropriate store class' do + if store_implementation == :in_memory + expect(store).to be_kind_of(ConcurrentRequestCounter::InMemoryStore) + else + expect(store).to be_kind_of(ConcurrentRequestCounter::RedisStore) + end + end - it 'returns false for a recurring user with too many concurrent requests' do - max_concurrent_requests.times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } - expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_falsey - end + it 'uses a connection pool size that equals the maximum puma threads' do + skip('Not relevant for InMemoryStore') if store_implementation == :in_memory - it 'returns true again for a recurring user after a single decrement' do - (max_concurrent_requests + 1).times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } - concurrent_request_counter.decrement(user_guid, logger) - expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy - end - end + expect(ConnectionPool::Wrapper).to have_received(:new).with(size: 123) + end - describe '#decrement' do - it 'decreases the number of concurrent requests, allowing for another concurrent request' do - max_concurrent_requests.times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } - concurrent_request_counter.decrement(user_guid, logger) - expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy - end + context 'with custom connection pool size' do + let(:concurrent_request_counter) { ConcurrentRequestCounter.new('test', redis_connection_pool_size: 456) } + + it 'uses the provided connection pool size' do + skip('Not relevant for InMemoryStore') if store_implementation == :in_memory + + expect(ConnectionPool::Wrapper).to have_received(:new).with(size: 456) + end + end + end + + describe '#try_increment?' do + it 'calls @store.try_increment? with the prefixed user guid and the given maximum concurrent requests and logger' do + allow(store).to receive(:try_increment?).and_call_original + concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) + expect(store).to have_received(:try_increment?).with("test:#{user_guid}", max_concurrent_requests, logger) + end - it 'does not decrease the number of concurrent requests below zero' do - concurrent_request_counter.decrement(user_guid, logger) - max_concurrent_requests.times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } - expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_falsey + it 'returns true for a new user' do + expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy + end + + it 'returns true for a recurring user performing the maximum allowed concurrent requests' do + (max_concurrent_requests - 1).times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } + expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy + end + + it 'returns false for a recurring user with too many concurrent requests' do + max_concurrent_requests.times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } + expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_falsey + end + + it 'returns true again for a recurring user after a single decrement' do + (max_concurrent_requests + 1).times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } + concurrent_request_counter.decrement(user_guid, logger) + expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy + end + + it 'returns true in case of a Redis error' do + skip('Not relevant for InMemoryStore') if store_implementation == :in_memory + + allow_any_instance_of(MockRedis::StringMethods).to receive(:incr).and_raise(Redis::ConnectionError) + allow_any_instance_of(Redis).to receive(:incr).and_raise(Redis::ConnectionError) + allow(logger).to receive(:error) + + expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy + expect(logger).to have_received(:error).with(/Redis error/) + end + end + + describe '#decrement' do + it 'calls @store.decrement with the prefixed user guid and the given logger' do + allow(store).to receive(:decrement).and_call_original + concurrent_request_counter.decrement(user_guid, logger) + expect(store).to have_received(:decrement).with("test:#{user_guid}", logger) + end + + it 'decreases the number of concurrent requests, allowing for another concurrent request' do + max_concurrent_requests.times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } + concurrent_request_counter.decrement(user_guid, logger) + expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_truthy + end + + it 'does not decrease the number of concurrent requests below zero' do + concurrent_request_counter.decrement(user_guid, logger) + max_concurrent_requests.times { concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger) } + expect(concurrent_request_counter.try_increment?(user_guid, max_concurrent_requests, logger)).to be_falsey + end + + it 'writes an error log in case of a Redis error' do + skip('Not relevant for InMemoryStore') if store_implementation == :in_memory + + allow_any_instance_of(MockRedis::StringMethods).to receive(:decr).and_raise(Redis::ConnectionError) + allow_any_instance_of(Redis).to receive(:decr).and_raise(Redis::ConnectionError) + allow(logger).to receive(:error) + + concurrent_request_counter.decrement(user_guid, logger) + expect(logger).to have_received(:error).with(/Redis error/) + end + end end end end