Skip to content

Commit

Permalink
Fix cc.requests.outstanding.gauge when using puma web server
Browse files Browse the repository at this point in the history
#1312 introduced `cc.requests.outstanding.gauge` which holds the counter in memory. With the introduction of puma there may be multiple processes, so each would emit its own value for this metric. This would cause the gauge to flop between values. This metric is listed as an important kpi for capi scaling https://docs.cloudfoundry.org/running/managing-cf/scaling-cloud-controller.html#cloud_controller_ng.

This fix for puma will instead uses Redis for the gauge.

* Move requests_metric to use statsd_updater
* Statsd_updater now contains statsd request logic
* Add Redis/Inmemory store to statsd_updager
* Add a missing mutux to counter for thread safety for the in memory implementation.
* Chose to prefix the entry in redis with `metrics:` but open to feedback here.

Inspired by 4539e59

An alternative considered, was to read the prometheus metric and re-emit that to StatsD, however we observed performance degradation. Presumably because of the number of reads from disk for the [DirectFileStorage](https://github.com/prometheus/client_ruby?tab=readme-ov-file#directfilestore-caveats-and-things-to-keep-in-mind) to aggregate the metric across processes.
  • Loading branch information
Samze authored and sethboyles committed Jun 14, 2024
1 parent d1d1d97 commit 82db58e
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 54 deletions.
22 changes: 5 additions & 17 deletions lib/cloud_controller/metrics/request_metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,21 @@
module VCAP::CloudController
module Metrics
class RequestMetrics
def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater)
@counter = 0
@statsd = statsd
def initialize(statsd_updater=CloudController::DependencyLocator.instance.statsd_updater, prometheus_updater=CloudController::DependencyLocator.instance.prometheus_updater)
@statsd_updater = statsd_updater
@prometheus_updater = prometheus_updater
end

def start_request
@counter += 1
@statsd.gauge('cc.requests.outstanding.gauge', @counter)
@statsd.increment 'cc.requests.outstanding'

@statsd_updater.start_request
@prometheus_updater.increment_gauge_metric(:cc_requests_outstanding_total)
end

def complete_request(status)
http_status_code = "#{status.to_s[0]}XX"
http_status_metric = "cc.http_status.#{http_status_code}"
@counter -= 1
@statsd.gauge('cc.requests.outstanding.gauge', @counter)
@statsd.batch do |batch|
batch.decrement 'cc.requests.outstanding'
batch.increment 'cc.requests.completed'
batch.increment http_status_metric
end
@statsd_updater.complete_request(status)

@prometheus_updater.decrement_gauge_metric(:cc_requests_outstanding_total)
@prometheus_updater.increment_counter_metric(:cc_requests_completed_total)
@prometheus_updater.decrement_gauge_metric(:cc_requests_outstanding_total)
end
end
end
Expand Down
65 changes: 64 additions & 1 deletion lib/cloud_controller/metrics/statsd_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

module VCAP::CloudController::Metrics
class StatsdUpdater
def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client)
def initialize(statsd=CloudController::DependencyLocator.instance.statsd_client, redis_connection_pool_size: nil)
@statsd = statsd
@redis_connection_pool_size = redis_connection_pool_size
end

def update_deploying_count(deploying_count)
Expand Down Expand Up @@ -93,10 +94,72 @@ def report_staging_failure_metrics(duration_ns)
@statsd.timing('cc.staging.failed_duration', nanoseconds_to_milliseconds(duration_ns))
end

def start_request
@statsd.increment('cc.requests.outstanding')
@statsd.gauge('cc.requests.outstanding.gauge', store.increment_request_outstanding_gauge)
end

def complete_request(status)
http_status_code = "#{status.to_s[0]}XX"
http_status_metric = "cc.http_status.#{http_status_code}"
@statsd.gauge('cc.requests.outstanding.gauge', store.decrement_request_outstanding_gauge)
@statsd.batch do |batch|
batch.decrement 'cc.requests.outstanding'
batch.increment 'cc.requests.completed'
batch.increment http_status_metric
end
end

private

def nanoseconds_to_milliseconds(time_ns)
(time_ns / 1e6).to_i
end

def store
return @store if defined?(@store)

redis_socket = VCAP::CloudController::Config.config.get(:redis, :socket)
@store = redis_socket.nil? ? InMemoryStore.new : RedisStore.new(redis_socket, @redis_connection_pool_size)
end

class InMemoryStore
def initialize
@mutex = Mutex.new
@counter = 0
end

def increment_request_outstanding_gauge
@mutex.synchronize do
@counter += 1
end
end

def decrement_request_outstanding_gauge
@mutex.synchronize do
@counter -= 1
end
end
end

class RedisStore
def initialize(socket, connection_pool_size)
connection_pool_size ||= VCAP::CloudController::Config.config.get(:puma, :max_threads) || 1
@redis = ConnectionPool::Wrapper.new(size: connection_pool_size) do
Redis.new(timeout: 1, path: socket)
end
@prefix = 'metrics'
@request_outstanding_gauge_key = "#{@prefix}:cc.requests.outstanding.gauge"
@redis.set(@request_outstanding_gauge_key, 0)
end

def increment_request_outstanding_gauge
@redis.incr(@request_outstanding_gauge_key)
end

def decrement_request_outstanding_gauge
@redis.decr(@request_outstanding_gauge_key)
end
end
end
end
2 changes: 1 addition & 1 deletion lib/cloud_controller/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def initialize(argv)

request_logs = VCAP::CloudController::Logs::RequestLogs.new(Steno.logger('cc.api'))

request_metrics = VCAP::CloudController::Metrics::RequestMetrics.new(CloudController::DependencyLocator.instance.statsd_client,
request_metrics = VCAP::CloudController::Metrics::RequestMetrics.new(CloudController::DependencyLocator.instance.statsd_updater,
CloudController::DependencyLocator.instance.prometheus_updater)
builder = RackAppBuilder.new
app = builder.build(@config, request_metrics, request_logs)
Expand Down
41 changes: 6 additions & 35 deletions spec/unit/lib/cloud_controller/metrics/request_metrics_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,37 @@

module VCAP::CloudController::Metrics
RSpec.describe RequestMetrics do
let(:statsd_client) { double(:statsd_client) }
let(:statsd_updater) { double(:statsd_updater) }
let(:prometheus_client) { double(:prometheus_client) }
let(:request_metrics) { RequestMetrics.new(statsd_client, prometheus_client) }
let(:request_metrics) { RequestMetrics.new(statsd_updater, prometheus_client) }

before do
allow(prometheus_client).to receive(:update_gauge_metric)
allow(prometheus_client).to receive(:decrement_gauge_metric)
allow(prometheus_client).to receive(:increment_gauge_metric)
allow(prometheus_client).to receive(:increment_counter_metric)
allow(statsd_updater).to receive(:start_request)
allow(statsd_updater).to receive(:complete_request)
end

describe '#start_request' do
before do
allow(statsd_client).to receive(:increment)
allow(statsd_client).to receive(:gauge)
end

it 'increments outstanding requests for statsd' do
request_metrics.start_request

expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', 1)
expect(statsd_client).to have_received(:increment).with('cc.requests.outstanding')
expect(statsd_updater).to have_received(:start_request)
end

it 'increments outstanding requests for prometheus' do
request_metrics.start_request

expect(prometheus_client).to have_received(:increment_gauge_metric).with(:cc_requests_outstanding_total)
end
end

describe '#complete_request' do
let(:batch) { double(:batch) }
let(:status) { 204 }

before do
allow(statsd_client).to receive(:batch).and_yield(batch)
allow(statsd_client).to receive(:gauge)
allow(batch).to receive(:increment)
allow(batch).to receive(:decrement)
end

it 'increments completed, decrements outstanding, increments status for statsd' do
request_metrics.complete_request(status)

expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', -1)
expect(batch).to have_received(:decrement).with('cc.requests.outstanding')
expect(batch).to have_received(:increment).with('cc.requests.completed')
expect(batch).to have_received(:increment).with('cc.http_status.2XX')
expect(statsd_updater).to have_received(:complete_request).with(status)
end

it 'increments completed and decrements outstanding for prometheus' do
Expand All @@ -60,17 +42,6 @@ module VCAP::CloudController::Metrics
expect(prometheus_client).to have_received(:decrement_gauge_metric).with(:cc_requests_outstanding_total)
expect(prometheus_client).to have_received(:increment_counter_metric).with(:cc_requests_completed_total)
end

it 'normalizes http status codes in statsd' do
request_metrics.complete_request(200)
expect(batch).to have_received(:increment).with('cc.http_status.2XX')

request_metrics.complete_request(300)
expect(batch).to have_received(:increment).with('cc.http_status.3XX')

request_metrics.complete_request(400)
expect(batch).to have_received(:increment).with('cc.http_status.4XX')
end
end
end
end
175 changes: 175 additions & 0 deletions spec/unit/lib/cloud_controller/metrics/statsd_updater_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module VCAP::CloudController::Metrics
RSpec.describe StatsdUpdater do
let(:updater) { StatsdUpdater.new(statsd_client) }
let(:statsd_client) { Statsd.new('localhost', 9999) }
let(:store) { double(:store) }

describe '#update_deploying_count' do
before do
Expand Down Expand Up @@ -281,5 +282,179 @@ module VCAP::CloudController::Metrics
expect(statsd_client).to have_received(:timing).with('cc.staging.failed_duration', duration_ms)
end
end

describe '#start_request' do
before do
allow(statsd_client).to receive(:increment)
allow(statsd_client).to receive(:gauge)
allow(updater).to receive(:store).and_return(store)
allow(store).to receive(:increment_request_outstanding_gauge).and_return(4)
end

it 'increments outstanding requests for statsd' do
updater.start_request

expect(store).to have_received(:increment_request_outstanding_gauge)
expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', 4)
expect(statsd_client).to have_received(:increment).with('cc.requests.outstanding')
end
end

describe '#complete_request' do
let(:batch) { double(:batch) }
let(:status) { 204 }

before do
allow(statsd_client).to receive(:batch).and_yield(batch)
allow(statsd_client).to receive(:gauge)
allow(batch).to receive(:increment)
allow(batch).to receive(:decrement)
allow(updater).to receive(:store).and_return(store)
allow(store).to receive(:decrement_request_outstanding_gauge).and_return(5)
end

it 'increments completed, decrements outstanding, increments status for statsd' do
updater.complete_request(status)

expect(store).to have_received(:decrement_request_outstanding_gauge)
expect(statsd_client).to have_received(:gauge).with('cc.requests.outstanding.gauge', 5)
expect(batch).to have_received(:decrement).with('cc.requests.outstanding')
expect(batch).to have_received(:increment).with('cc.requests.completed')
expect(batch).to have_received(:increment).with('cc.http_status.2XX')
end

it 'normalizes http status codes in statsd' do
updater.complete_request(200)
expect(batch).to have_received(:increment).with('cc.http_status.2XX')

updater.complete_request(300)
expect(batch).to have_received(:increment).with('cc.http_status.3XX')

updater.complete_request(400)
expect(batch).to have_received(:increment).with('cc.http_status.4XX')
end
end

describe '#store' do
let(:config) { double(:config) }

before do
allow(config).to receive(:get).with(:cc, :enable_statsd_metrics).and_return(true)
allow(VCAP::CloudController::Config).to receive(:config).and_return(config)
end

context 'when redis socket is not configured' do
before do
allow(config).to receive(:get).with(:redis, :socket).and_return(nil)
end

it 'returns an instance of InMemoryStore' do
store = updater.send(:store)
expect(store).to be_an_instance_of(StatsdUpdater::InMemoryStore)
end
end

context 'when redis socket is configured' do
let(:redis_socket) { 'redis.sock' }

before do
allow(config).to receive(:get).with(:redis, :socket).and_return(redis_socket)
allow(config).to receive(:get).with(:puma, :max_threads).and_return(nil)
end

it 'returns an instance of RedisStore' do
expect(ConnectionPool::Wrapper).to receive(:new).with(size: 1).and_call_original
store = updater.send(:store)
expect(store).to be_an_instance_of(StatsdUpdater::RedisStore)
end

context 'when puma max threads is set' do
let(:pool_size) { 10 }

before do
allow(config).to receive(:get).with(:puma, :max_threads).and_return(pool_size)
end

it 'passes the connection pool size to RedisStore' do
expect(ConnectionPool::Wrapper).to receive(:new).with(size: pool_size).and_call_original
updater.send(:store)
end
end
end
end

describe StatsdUpdater::InMemoryStore do
let(:store) { StatsdUpdater::InMemoryStore.new }

it 'increments the counter' do
expect(store.increment_request_outstanding_gauge).to eq(1)
expect(store.increment_request_outstanding_gauge).to eq(2)
expect(store.increment_request_outstanding_gauge).to eq(3)
end

it 'decrements the counter' do
expect(store.decrement_request_outstanding_gauge).to eq(-1)
expect(store.decrement_request_outstanding_gauge).to eq(-2)
expect(store.decrement_request_outstanding_gauge).to eq(-3)
end
end

describe StatsdUpdater::RedisStore do
let(:redis_socket) { 'redis.sock' }
let(:connection_pool_size) { 5 }
let(:redis_store) { StatsdUpdater::RedisStore.new(redis_socket, connection_pool_size) }
let(:redis) { instance_double(Redis) }
let(:config) { double(:config) }

before do
allow(VCAP::CloudController::Config).to receive(:config).and_return(config)
allow(ConnectionPool::Wrapper).to receive(:new).and_return(redis)
allow(redis).to receive(:set)
end

describe 'initialization' do
it 'clears cc.requests.outstanding.gauge' do
expect(redis).to receive(:set).with('metrics:cc.requests.outstanding.gauge', 0)
StatsdUpdater::RedisStore.new(redis_socket, connection_pool_size)
end

it 'configures a Redis connection pool with specified size' do
expect(ConnectionPool::Wrapper).to receive(:new).with(size: connection_pool_size).and_call_original
StatsdUpdater::RedisStore.new(redis_socket, connection_pool_size)
end

context 'when the connection pool size is not provided' do
before do
allow(config).to receive(:get).with(:puma, :max_threads).and_return(nil)
end

it 'uses a default connection pool size of 1' do
expect(ConnectionPool::Wrapper).to receive(:new).with(size: 1).and_call_original
StatsdUpdater::RedisStore.new(redis_socket, nil)
end

context 'when puma max threads is set' do
before do
allow(config).to receive(:get).with(:puma, :max_threads).and_return(10)
end

it 'uses puma max threads' do
expect(ConnectionPool::Wrapper).to receive(:new).with(size: 10).and_call_original
StatsdUpdater::RedisStore.new(redis_socket, nil)
end
end
end
end

it 'increments the gauge in Redis' do
allow(redis).to receive(:incr).with('metrics:cc.requests.outstanding.gauge').and_return(1)
expect(redis_store.increment_request_outstanding_gauge).to eq(1)
end

it 'decrements the gauge in Redis' do
allow(redis).to receive(:decr).with('metrics:cc.requests.outstanding.gauge').and_return(-1)
expect(redis_store.decrement_request_outstanding_gauge).to eq(-1)
end
end
end
end

0 comments on commit 82db58e

Please sign in to comment.