From 28582cbc2e7f828550d22d913baeb9edff3322bd Mon Sep 17 00:00:00 2001 From: Sam Gunaratne Date: Fri, 31 May 2024 12:50:16 -0600 Subject: [PATCH] Fix cc.requests.outstanding.gauge when using puma web server https://github.com/cloudfoundry/cloud_controller_ng/issues/1312 introduced cc.requests.outstanding.gauge which holds the counter in memory. With the introduction of puma there may be multiple processes, each would have its own value for this metric. This would cause the gauge to flop between values. This fix, specifically for puma instead uses redis. Additionally, add a missing mutux to counter for thread safety for the in memory implementation. Inspired by https://github.com/cloudfoundry/cloud_controller_ng/commit/4539e596ab6ae64556b170e4387633e9ebd55292 --- .../metrics/request_metrics.rb | 62 +++++++-- .../metrics/request_metrics_spec.rb | 120 +++++++++++++++++- 2 files changed, 172 insertions(+), 10 deletions(-) diff --git a/lib/cloud_controller/metrics/request_metrics.rb b/lib/cloud_controller/metrics/request_metrics.rb index 2d3e597a272..ff2386bc7ed 100644 --- a/lib/cloud_controller/metrics/request_metrics.rb +++ b/lib/cloud_controller/metrics/request_metrics.rb @@ -3,33 +3,79 @@ module VCAP::CloudController module Metrics class RequestMetrics - def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater) - @counter = 0 + def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater, redis_connection_pool_size: nil) + @mutex = Mutex.new @statsd = statsd @prometheus_updater = prometheus_updater end def start_request - @counter += 1 - @statsd.gauge('cc.requests.outstanding.gauge', @counter) @statsd.increment 'cc.requests.outstanding' - @prometheus_updater.increment_gauge_metric(:cc_requests_outstanding_total) + gauge = store.increment + @statsd.gauge('cc.requests.outstanding.gauge', gauge) end def complete_request(status) http_status_code = "#{status.to_s[0]}XX" http_status_metric = "cc.http_status.#{http_status_code}" - @counter -= 1 - @statsd.gauge('cc.requests.outstanding.gauge', @counter) @statsd.batch do |batch| batch.decrement 'cc.requests.outstanding' batch.increment 'cc.requests.completed' batch.increment http_status_metric end - @prometheus_updater.decrement_gauge_metric(:cc_requests_outstanding_total) @prometheus_updater.increment_counter_metric(:cc_requests_completed_total) + @prometheus_updater.decrement_gauge_metric(:cc_requests_outstanding_total) + + gauge = store.decrement + @statsd.gauge('cc.requests.outstanding.gauge', gauge) + end + + private + + def store + return @store if defined?(@store) + + redis_socket = VCAP::CloudController::Config.config.get(:redis, :socket) + @store = redis_socket.nil? ? InMemoryStore.new : RedisStore.new(redis_socket, @redis_connection_pool_size) + end + + class InMemoryStore + def initialize + @mutex = Mutex.new + @counter = 0 + end + + def increment + @mutex.synchronize do + @counter += 1 + end + end + + def decrement + @mutex.synchronize do + @counter -= 1 + end + end + end + + class RedisStore + def initialize(socket, connection_pool_size) + connection_pool_size ||= VCAP::CloudController::Config.config.get(:puma, :max_threads) || 1 + @redis = ConnectionPool::Wrapper.new(size: connection_pool_size) do + Redis.new(timeout: 1, path: socket) + end + @redis.set("cc.requests.outstanding.gauge", 0) + end + + def increment + @redis.incr("cc.requests.outstanding.gauge") + end + + def decrement + @redis.decr("cc.requests.outstanding.gauge") + end end end end diff --git a/spec/unit/lib/cloud_controller/metrics/request_metrics_spec.rb b/spec/unit/lib/cloud_controller/metrics/request_metrics_spec.rb index 8d5bf7dd220..c6609153cef 100644 --- a/spec/unit/lib/cloud_controller/metrics/request_metrics_spec.rb +++ b/spec/unit/lib/cloud_controller/metrics/request_metrics_spec.rb @@ -5,6 +5,7 @@ module VCAP::CloudController::Metrics RSpec.describe RequestMetrics do let(:statsd_client) { double(:statsd_client) } let(:prometheus_client) { double(:prometheus_client) } + let(:store) { double(:store) } let(:request_metrics) { RequestMetrics.new(statsd_client, prometheus_client) } before do @@ -18,12 +19,15 @@ module VCAP::CloudController::Metrics before do allow(statsd_client).to receive(:increment) allow(statsd_client).to receive(:gauge) + allow(request_metrics).to receive(:store).and_return(store) + allow(store).to receive(:increment).and_return(4) end it 'increments outstanding requests for statsd' do request_metrics.start_request - expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', 1) + expect(store).to have_received(:increment) + expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', 4) expect(statsd_client).to have_received(:increment).with('cc.requests.outstanding') end @@ -43,12 +47,15 @@ module VCAP::CloudController::Metrics allow(statsd_client).to receive(:gauge) allow(batch).to receive(:increment) allow(batch).to receive(:decrement) + allow(request_metrics).to receive(:store).and_return(store) + allow(store).to receive(:decrement).and_return(5) end it 'increments completed, decrements outstanding, increments status for statsd' do request_metrics.complete_request(status) - expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', -1) + expect(store).to have_received(:decrement) + expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', 5) expect(batch).to have_received(:decrement).with('cc.requests.outstanding') expect(batch).to have_received(:increment).with('cc.requests.completed') expect(batch).to have_received(:increment).with('cc.http_status.2XX') @@ -72,5 +79,114 @@ module VCAP::CloudController::Metrics expect(batch).to have_received(:increment).with('cc.http_status.4XX') end end + + describe '#store' do + context 'when redis socket is not configured' do + before do + allow(VCAP::CloudController::Config).to receive_message_chain(:config, :get).with(:redis, :socket).and_return(nil) + end + + it 'returns an instance of InMemoryStore' do + store = request_metrics.send(:store) + expect(store).to be_an_instance_of(RequestMetrics::InMemoryStore) + end + end + + context 'when redis socket is configured' do + let(:redis_socket) { 'redis.sock' } + + before do + allow(VCAP::CloudController::Config).to receive_message_chain(:config, :get).with(:redis, :socket).and_return(redis_socket) + allow(VCAP::CloudController::Config).to receive_message_chain(:config, :get).with(:puma, :max_threads).and_return(nil) + end + + it 'returns an instance of RedisStore' do + expect(ConnectionPool::Wrapper).to receive(:new).with(size: 1).and_call_original + store = request_metrics.send(:store) + expect(store).to be_an_instance_of(RequestMetrics::RedisStore) + end + + context 'when puma max threads is set' do + let(:pool_size) { 10 } + before do + allow(VCAP::CloudController::Config).to receive_message_chain(:config, :get).with(:redis, :socket).and_return(redis_socket) + allow(VCAP::CloudController::Config).to receive_message_chain(:config, :get).with(:puma, :max_threads).and_return(pool_size) + end + + it 'passes the connection pool size to RedisStore' do + expect(ConnectionPool::Wrapper).to receive(:new).with(size: pool_size).and_call_original + store = request_metrics.send(:store) + end + end + end + end + + describe RequestMetrics::InMemoryStore do + let(:store) { RequestMetrics::InMemoryStore.new } + + it 'increments the counter' do + expect(store.increment).to eq(1) + expect(store.increment).to eq(2) + expect(store.increment).to eq(3) + end + + it 'decrements the counter' do + expect(store.decrement).to eq(-1) + expect(store.decrement).to eq(-2) + expect(store.decrement).to eq(-3) + end + end + + describe RequestMetrics::RedisStore do + let(:redis_socket) { 'redis.sock' } + let(:connection_pool_size) { 5 } + let(:redis_store) { RequestMetrics::RedisStore.new(redis_socket, connection_pool_size) } + let(:redis) { instance_double('Redis') } + + before do + allow(ConnectionPool::Wrapper).to receive(:new).and_return(redis) + allow(redis).to receive(:set) + end + + describe 'initialization' do + it 'clears cc.requests.outstanding.gauge' do + expect(redis).to receive(:set).with("cc.requests.outstanding.gauge", 0) + RequestMetrics::RedisStore.new(redis_socket, connection_pool_size) + end + + it 'configures a Redis connection pool with specified size' do + expect(ConnectionPool::Wrapper).to receive(:new).with(size: connection_pool_size).and_call_original + RequestMetrics::RedisStore.new(redis_socket, connection_pool_size) + end + + context 'when the connection pool size is not provided' do + it 'uses a default connection pool size of 1' do + expect(ConnectionPool::Wrapper).to receive(:new).with(size: 1).and_call_original + RequestMetrics::RedisStore.new(redis_socket, nil) + end + + context 'when puma max threads is set' do + before do + allow(VCAP::CloudController::Config).to receive_message_chain(:config, :get).with(:puma, :max_threads).and_return(10) + end + + it 'uses puma max threads' do + expect(ConnectionPool::Wrapper).to receive(:new).with(size: 10).and_call_original + RequestMetrics::RedisStore.new(redis_socket, nil) + end + end + end + end + + it 'increments the gauge in Redis' do + allow(redis).to receive(:incr).and_return(1) + expect(redis_store.increment).to eq(1) + end + + it 'decrements the gauge in Redis' do + allow(redis).to receive(:decr).and_return(-1) + expect(redis_store.decrement).to eq(-1) + end + end end end