From f17bd197766be6b756cce63af96f5cc2adccc582 Mon Sep 17 00:00:00 2001 From: Ryan Dearing Date: Wed, 1 Apr 2020 18:52:05 -0600 Subject: [PATCH] circuit breaker that opens based on error rate (% errors in time window) --- lib/semian.rb | 20 ++ lib/semian/error_rate_circuit_breaker.rb | 167 +++++++++++ test/error_rate_circuit_breaker_test.rb | 342 +++++++++++++++++++++++ test/semian_test.rb | 18 ++ 4 files changed, 547 insertions(+) create mode 100644 lib/semian/error_rate_circuit_breaker.rb create mode 100644 test/error_rate_circuit_breaker_test.rb diff --git a/lib/semian.rb b/lib/semian.rb index 7cdd504f7..084a624cf 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -8,9 +8,11 @@ require 'semian/platform' require 'semian/resource' require 'semian/circuit_breaker' +require 'semian/error_rate_circuit_breaker' require 'semian/protected_resource' require 'semian/unprotected_resource' require 'semian/simple_sliding_window' +require 'semian/time_sliding_window' require 'semian/simple_integer' require 'semian/simple_state' require 'semian/lru_hash' @@ -245,9 +247,27 @@ def thread_safe=(thread_safe) private + def create_error_rate_circuit_breaker(name, **options) + require_keys!([:success_threshold, :error_percent_threshold, :error_timeout, + :request_volume_threshold, :window_size], options) + + exceptions = options[:exceptions] || [] + ErrorRateCircuitBreaker.new(name, + success_threshold: options[:success_threshold], + error_percent_threshold: options[:error_percent_threshold], + error_timeout: options[:error_timeout], + exceptions: Array(exceptions) + [::Semian::BaseError], + half_open_resource_timeout: options[:half_open_resource_timeout], + request_volume_threshold: options[:request_volume_threshold], + window_size: options[:window_size], + implementation: implementation(**options)) + end + def create_circuit_breaker(name, **options) circuit_breaker = options.fetch(:circuit_breaker, true) return unless circuit_breaker + return create_error_rate_circuit_breaker(name, **options) if options.key?(:error_percent_threshold) + require_keys!([:success_threshold, :error_threshold, :error_timeout], options) exceptions = options[:exceptions] || [] diff --git a/lib/semian/error_rate_circuit_breaker.rb b/lib/semian/error_rate_circuit_breaker.rb new file mode 100644 index 000000000..562357ddb --- /dev/null +++ b/lib/semian/error_rate_circuit_breaker.rb @@ -0,0 +1,167 @@ +module Semian + class ErrorRateCircuitBreaker #:nodoc: + extend Forwardable + + def_delegators :@state, :closed?, :open?, :half_open? + + attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error, :error_percent_threshold, + :request_volume_threshold, :success_count_threshold + + def initialize(name, exceptions:, error_percent_threshold:, error_timeout:, window_size:, + request_volume_threshold:, success_threshold:, implementation:, half_open_resource_timeout: nil) + + raise 'error_threshold_percent should be between 0.0 and 1.0 exclusive' unless (0.0001...1.0).cover?(error_percent_threshold) + + @name = name.to_sym + @error_timeout = error_timeout + @exceptions = exceptions + @half_open_resource_timeout = half_open_resource_timeout + @error_percent_threshold = error_percent_threshold + @last_error_time = nil + @request_volume_threshold = request_volume_threshold + @success_count_threshold = success_threshold + + @results = implementation::TimeSlidingWindow.new(window_size) + @state = implementation::State.new + + reset + end + + def acquire(resource = nil, &block) + return yield if disabled? + transition_to_half_open if transition_to_half_open? + + raise OpenCircuitError unless request_allowed? + + result = nil + begin + result = maybe_with_half_open_resource_timeout(resource, &block) + rescue *@exceptions => error + if !error.respond_to?(:marks_semian_circuits?) || error.marks_semian_circuits? + mark_failed(error) + end + raise error + else + mark_success + end + result + end + + def transition_to_half_open? + open? && error_timeout_expired? && !half_open? + end + + def request_allowed? + closed? || half_open? || transition_to_half_open? + end + + def mark_failed(error) + push_error(error) + if closed? + transition_to_open if error_threshold_reached? + elsif half_open? + transition_to_open + end + end + + def mark_success + @results << true + return unless half_open? + transition_to_close if success_threshold_reached? + end + + def reset + @last_error_time = nil + @results.clear + transition_to_close + end + + def destroy + @state.destroy + end + + # TODO understand what this is used for inside Semian lib + def in_use? + return false if error_timeout_expired? + @results.count(false) > 0 + end + + private + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def transition_to_close + notify_state_transition(:closed) + log_state_transition(:closed) + @state.close! + @results.clear + end + + def transition_to_open + notify_state_transition(:open) + log_state_transition(:open) + @state.open! + end + + def transition_to_half_open + notify_state_transition(:half_open) + log_state_transition(:half_open) + @state.half_open! + @results.clear + end + + def success_threshold_reached? + @results.count(true) >= @success_count_threshold + end + + def error_threshold_reached? + return false if @results.empty? or @results.length < @request_volume_threshold + @results.count(false).to_f / @results.length.to_f >= @error_percent_threshold + end + + def error_timeout_expired? + return false unless @last_error_time + current_time - @last_error_time >= @error_timeout + end + + def push_error(error) + @last_error = error + @last_error_time = current_time + @results << false + end + + def log_state_transition(new_state) + return if @state.nil? || new_state == @state.value + + str = "[#{self.class.name}] State transition from #{@state.value} to #{new_state}." + str << " success_count=#{@results.count(true)} error_count=#{@results.count(false)}" + str << " success_count_threshold=#{@success_count_threshold} error_count_percent=#{@error_percent_threshold}" + str << " error_timeout=#{@error_timeout} error_last_at=\"#{@last_error_time}\"" + str << " name=\"#{@name}\"" + Semian.logger.info(str) + end + + def notify_state_transition(new_state) + Semian.notify(:state_change, self, nil, nil, state: new_state) + end + + def disabled? + ENV['SEMIAN_CIRCUIT_BREAKER_DISABLED'] || ENV['SEMIAN_DISABLED'] + end + + def maybe_with_half_open_resource_timeout(resource, &block) + result = + if half_open? && @half_open_resource_timeout && resource.respond_to?(:with_resource_timeout) + resource.with_resource_timeout(@half_open_resource_timeout) do + block.call + end + else + block.call + end + + result + end + end +end diff --git a/test/error_rate_circuit_breaker_test.rb b/test/error_rate_circuit_breaker_test.rb new file mode 100644 index 000000000..ced39e6e8 --- /dev/null +++ b/test/error_rate_circuit_breaker_test.rb @@ -0,0 +1,342 @@ +require 'test_helper' + +class TestErrorRateCircuitBreaker < Minitest::Test + include CircuitBreakerHelper + + def setup + @strio = StringIO.new + Semian.logger = Logger.new @strio + begin + Semian.destroy(:testing) + rescue + nil + end + @resource = ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [Exception], + error_percent_threshold: 0.5, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: nil) + end + + # Timecop doesn't work with MONOTONIC_CLOCK + def sleep_for_sec(seconds) + sleep(seconds) + end + + def half_open_circuit(resource = @resource) + open_circuit!(resource) + assert_circuit_opened(resource) + sleep_for_sec(1.1) + assert resource.transition_to_half_open?, 'Expect breaker to be half-open' + end + + def test_error_threshold_must_be_between_0_and_1 + assert_raises RuntimeError do + ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [Exception], + error_percent_threshold: 1.0, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: nil) + end + + assert_raises RuntimeError do + ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [Exception], + error_percent_threshold: 0.0, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: nil) + end + end + + def test_acquire_yield_when_the_circuit_is_closed + block_called = false + @resource.acquire { block_called = true } + assert_equal true, block_called + end + + def test_acquire_raises_circuit_open_error_when_the_circuit_is_open + open_circuit! + assert_raises Semian::OpenCircuitError do + @resource.acquire { 1 + 1 } + end + assert_match(/State transition from closed to open/, @strio.string) + end + + def test_after_error_threshold_the_circuit_is_open + open_circuit! + assert_circuit_opened + end + + def test_after_error_timeout_is_elapsed_requests_are_attempted_again + half_open_circuit + assert_circuit_closed + end + + def test_until_success_threshold_is_reached_a_single_error_will_reopen_the_circuit + half_open_circuit + trigger_error! + assert_circuit_opened + end + + def test_once_success_threshold_is_reached_only_error_threshold_will_open_the_circuit_again + half_open_circuit + assert_circuit_closed + trigger_error! + assert_circuit_closed + trigger_error! + assert_circuit_opened + end + + def test_reset_allow_to_close_the_circuit_and_forget_errors + open_circuit! + @resource.reset + assert_match(/State transition from open to closed/, @strio.string) + assert_circuit_closed + end + + def test_errors_more_than_duration_apart_doesnt_open_circuit + trigger_error! + sleep_for_sec(2) # allow time for error to slide off time window + trigger_error! + assert_circuit_closed + end + + def test_errors_under_threshold_doesnt_open_circuit + # 60% success rate + @resource.acquire { 1 + 1} + @resource.acquire { 1 + 1} + trigger_error! + @resource.acquire { 1 + 1} + trigger_error! + assert_circuit_closed + end + + def test_request_allowed_query_doesnt_trigger_transitions + open_circuit! + refute_predicate @resource, :request_allowed? + assert_predicate @resource, :open? + sleep_for_sec(1.1) + assert_predicate @resource, :request_allowed? + assert_predicate @resource, :open? + end + + def test_open_close_open_cycle + resource = ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [Exception], + error_percent_threshold: 0.5, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: nil) + open_circuit!(resource) + assert_circuit_opened(resource) + + sleep_for_sec(1.1) + + assert_circuit_closed(resource) + + assert resource.half_open? + assert_circuit_closed(resource) + + assert resource.closed? + + open_circuit!(resource) + assert_circuit_opened(resource) + + sleep_for_sec(1.1) + + + assert_circuit_closed(resource) + + assert resource.half_open? + assert_circuit_closed(resource) + + assert resource.closed? + + + end + + def test_env_var_disables_circuit_breaker + ENV['SEMIAN_CIRCUIT_BREAKER_DISABLED'] = '1' + open_circuit! + assert_circuit_closed + ensure + ENV.delete('SEMIAN_CIRCUIT_BREAKER_DISABLED') + end + + def test_semian_wide_env_var_disables_circuit_breaker + ENV['SEMIAN_DISABLED'] = '1' + open_circuit! + assert_circuit_closed + ensure + ENV.delete('SEMIAN_DISABLED') + end + + class RawResource + def timeout + @timeout || 2 + end + + def with_resource_timeout(timeout) + prev_timeout = @timeout + @timeout = timeout + yield + ensure + @timeout = prev_timeout + end + end + + def test_changes_resource_timeout_when_configured + resource = ::Semian::ErrorRateCircuitBreaker.new(:resource_timeout, + exceptions: [Exception], + error_percent_threshold: 0.5, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: 0.123) + + half_open_circuit(resource) + assert_circuit_closed(resource) + assert resource.half_open? + + raw_resource = RawResource.new + + triggered = false + resource.acquire(raw_resource) do + triggered = true + assert_equal 0.123, raw_resource.timeout + end + + assert triggered + assert_equal 2, raw_resource.timeout + end + + def test_doesnt_change_resource_timeout_when_closed + resource = ::Semian::ErrorRateCircuitBreaker.new(:resource_timeout, + exceptions: [Exception], + error_percent_threshold: 0.5, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: 0.123) + + raw_resource = RawResource.new + + triggered = false + resource.acquire(raw_resource) do + triggered = true + assert_equal 2, raw_resource.timeout + end + + assert triggered + assert_equal 2, raw_resource.timeout + end + + def test_doesnt_blow_up_when_configured_half_open_timeout_but_adapter_doesnt_support + resource = ::Semian::ErrorRateCircuitBreaker.new(:resource_timeout, + exceptions: [Exception], + error_percent_threshold: 0.5, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: 0.123) + + raw_resource = Object.new + + triggered = false + resource.acquire(raw_resource) do + triggered = true + end + + assert triggered + end + + class SomeErrorThatMarksCircuits < SomeError + def marks_semian_circuits? + true + end + end + + class SomeSubErrorThatDoesNotMarkCircuits < SomeErrorThatMarksCircuits + def marks_semian_circuits? + false + end + end + + def test_opens_circuit_when_error_has_marks_semian_circuits_equal_to_true + 2.times { trigger_error!(@resource, SomeErrorThatMarksCircuits) } + assert_circuit_opened + end + + def test_does_not_open_circuit_when_error_has_marks_semian_circuits_equal_to_false + 2.times { trigger_error!(@resource, SomeSubErrorThatDoesNotMarkCircuits) } + assert_circuit_closed + end + + def test_notify_state_transition + name = :test_notify_state_transition + + events = [] + Semian.subscribe(:test_notify_state_transition) do |event, resource, _scope, _adapter, payload| + if event == :state_change + events << {name: resource.name, state: payload[:state]} + end + end + + # Creating a resource should generate a :closed notification. + resource = ::Semian::ErrorRateCircuitBreaker.new(name, + exceptions: [Exception], + error_percent_threshold: 0.6666, + error_timeout: 1, + window_size: 2, + request_volume_threshold: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: 0.123) + assert_equal(1, events.length) + assert_equal(name, events[0][:name]) + assert_equal(:closed, events[0][:state]) + + # Acquiring a resource doesn't generate a transition. + resource.acquire { nil } + assert_equal(1, events.length) + + # error_threshold_percent failures causes a transition to open. + 2.times { trigger_error!(resource) } + assert_equal(2, events.length) + assert_equal(name, events[1][:name]) + assert_equal(:open, events[1][:state]) + + sleep_for_sec(1.1) + + # Acquiring the resource successfully generates a transition to half_open, then closed. + resource.acquire { nil } + assert_equal(4, events.length) + assert_equal(name, events[2][:name]) + assert_equal(:half_open, events[2][:state]) + assert_equal(name, events[3][:name]) + assert_equal(:closed, events[3][:state]) + ensure + Semian.unsubscribe(:test_notify_state_transition) + end +end diff --git a/test/semian_test.rb b/test/semian_test.rb index 65d6958f6..416dd45e5 100644 --- a/test/semian_test.rb +++ b/test/semian_test.rb @@ -68,6 +68,24 @@ def test_register_with_bulkhead_missing_options assert_equal exception.message, "Must pass exactly one of ticket or quota" end + def test_register_with_error_rate_circuitbreaker + resource = Semian.register( + :testing_error_rate, + success_threshold: 1, + error_percent_threshold: 0.2, + request_volume_threshold: 1, + window_size: 10, + error_timeout: 5, + circuit_breaker: true, + bulkhead: false, + thread_safety_disabled: false, + ) + + assert resource, Semian[:testing_error_rate] + assert resource.circuit_breaker.state.instance_of?(Semian::ThreadSafe::State) + assert resource.circuit_breaker.instance_of?(Semian::ErrorRateCircuitBreaker) + end + def test_unsuported_constants assert defined?(Semian::BaseError) assert defined?(Semian::SyscallError)