Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error rate circuit breaker #264

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
20 changes: 20 additions & 0 deletions lib/semian.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
require 'semian/platform'
require 'semian/resource'
require 'semian/circuit_breaker'
require 'semian/error_rate_circuit_breaker'
require 'semian/protected_resource'
require 'semian/unprotected_resource'
require 'semian/simple_sliding_window'
require 'semian/time_sliding_window'
require 'semian/simple_integer'
require 'semian/simple_state'
require 'semian/lru_hash'
Expand Down Expand Up @@ -245,9 +247,27 @@ def thread_safe=(thread_safe)

private

def create_error_rate_circuit_breaker(name, **options)
require_keys!([:success_threshold, :error_percent_threshold, :error_timeout,
:request_volume_threshold, :window_size], options)

exceptions = options[:exceptions] || []
ErrorRateCircuitBreaker.new(name,
success_threshold: options[:success_threshold],
error_percent_threshold: options[:error_percent_threshold],
error_timeout: options[:error_timeout],
exceptions: Array(exceptions) + [::Semian::BaseError],
half_open_resource_timeout: options[:half_open_resource_timeout],
request_volume_threshold: options[:request_volume_threshold],
window_size: options[:window_size],
implementation: implementation(**options))
end

def create_circuit_breaker(name, **options)
circuit_breaker = options.fetch(:circuit_breaker, true)
return unless circuit_breaker
return create_error_rate_circuit_breaker(name, **options) if options.key?(:error_percent_threshold)

require_keys!([:success_threshold, :error_threshold, :error_timeout], options)

exceptions = options[:exceptions] || []
Expand Down
166 changes: 166 additions & 0 deletions lib/semian/error_rate_circuit_breaker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
module Semian
class ErrorRateCircuitBreaker #:nodoc:
damianthe marked this conversation as resolved.
Show resolved Hide resolved
extend Forwardable

def_delegators :@state, :closed?, :open?, :half_open?

attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error, :error_percent_threshold,
:request_volume_threshold, :success_count_threshold

def initialize(name, exceptions:, error_percent_threshold:, error_timeout:, window_size:,
request_volume_threshold:, success_threshold:, implementation:, half_open_resource_timeout: nil)
damianthe marked this conversation as resolved.
Show resolved Hide resolved

raise 'error_threshold_percent should be between 0.0 and 1.0 exclusive' unless (0.0001...1.0).cover?(error_percent_threshold)
damianthe marked this conversation as resolved.
Show resolved Hide resolved

@name = name.to_sym
@error_timeout = error_timeout
@exceptions = exceptions
@half_open_resource_timeout = half_open_resource_timeout
@error_percent_threshold = error_percent_threshold
@last_error_time = nil
@request_volume_threshold = request_volume_threshold
@success_count_threshold = success_threshold

@window = implementation::TimeSlidingWindow.new(window_size)
@state = implementation::State.new

reset
end

def acquire(resource = nil, &block)
return yield if disabled?
transition_to_half_open if transition_to_half_open?

raise OpenCircuitError unless request_allowed?

result = nil
begin
result = maybe_with_half_open_resource_timeout(resource, &block)
rescue *@exceptions => error
if !error.respond_to?(:marks_semian_circuits?) || error.marks_semian_circuits?
mark_failed(error)
end
raise error
else
mark_success
end
result
end

def transition_to_half_open?
open? && error_timeout_expired? && !half_open?
end

def request_allowed?
closed? || half_open? || transition_to_half_open?
end

def mark_failed(error)
push_error(error)
if closed?
transition_to_open if error_threshold_reached?
damianthe marked this conversation as resolved.
Show resolved Hide resolved
elsif half_open?
transition_to_open
end
end

def mark_success
@window << true
return unless half_open?
transition_to_close if success_threshold_reached?
end

def reset
@last_error_time = nil
@window.clear
transition_to_close
end

def destroy
@state.destroy
end

def in_use?
return false if error_timeout_expired?
@window.count(false) > 0
end

private

def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def transition_to_close
notify_state_transition(:closed)
log_state_transition(:closed)
@state.close!
@window.clear
end

def transition_to_open
notify_state_transition(:open)
log_state_transition(:open)
@state.open!
end

def transition_to_half_open
notify_state_transition(:half_open)
log_state_transition(:half_open)
@state.half_open!
@window.clear
end

def success_threshold_reached?
@window.count(true) >= @success_count_threshold
end

def error_threshold_reached?
return false if @window.empty? or @window.length < @request_volume_threshold
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_volume_threshold is an interesting parameter because its desired value varies depending on the volume of requests.

If the volume of requests happening in window_size ever drops bellow request_volume_threshold, the circuit will never open.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@damianthe damianthe Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I can see how this is useful now. If you configure it to be above time_window / (timeout * request_volume_threshold) then it acts as a damper for the problem I was describing in #264 (comment)

@window.count(false).to_f / @window.length.to_f >= @error_percent_threshold
end

def error_timeout_expired?
return false unless @last_error_time
current_time - @last_error_time >= @error_timeout
end

def push_error(error)
@last_error = error
@last_error_time = current_time
@window << false
end

def log_state_transition(new_state)
return if @state.nil? || new_state == @state.value

str = "[#{self.class.name}] State transition from #{@state.value} to #{new_state}."
str << " success_count=#{@window.count(true)} error_count=#{@window.count(false)}"
str << " success_count_threshold=#{@success_count_threshold} error_count_percent=#{@error_percent_threshold}"
str << " error_timeout=#{@error_timeout} error_last_at=\"#{@last_error_time}\""
str << " name=\"#{@name}\""
Semian.logger.info(str)
end

def notify_state_transition(new_state)
Semian.notify(:state_change, self, nil, nil, state: new_state)
end

def disabled?
ENV['SEMIAN_CIRCUIT_BREAKER_DISABLED'] || ENV['SEMIAN_DISABLED']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires rethinking of changes in master branch. Idea to not use ENV inside business logic, only during the configuration phase.

end

def maybe_with_half_open_resource_timeout(resource, &block)
result =
if half_open? && @half_open_resource_timeout && resource.respond_to?(:with_resource_timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use two spaces for indentation on this block (rather than 4)

resource.with_resource_timeout(@half_open_resource_timeout) do
block.call
end
else
block.call
end

result
end
end
end
88 changes: 88 additions & 0 deletions lib/semian/time_sliding_window.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
require 'thread'

module Semian
module Simple

class TimeSlidingWindow #:nodoc:
extend Forwardable

def_delegators :@window, :size, :empty?, :length
attr_reader :time_window_millis

Pair = Struct.new(:head, :tail)

# A sliding window is a structure that stores the most recent entries that were pushed within the last slice of time
def initialize(time_window)
@time_window_millis = time_window * 1000
@window = []
end

def count(arg)
vals = @window.map { |pair| pair.tail}
vals.count(arg)
end

def push(value)
remove_old # make room
@window << Pair.new(current_time, value)
self
end

alias_method :<<, :push

def clear
@window.clear
self
end

def last
@window.last&.tail
end

def remove_old
midtime = current_time - time_window_millis
# special case, everything is too old
@window.clear if !@window.empty? and @window.last.head < midtime
# otherwise we find the index position where the cutoff is
idx = (0...@window.size).bsearch { |n| @window[n].head >= midtime }
damianthe marked this conversation as resolved.
Show resolved Hide resolved
@window.slice!(0, idx) if idx
end

alias_method :destroy, :clear

private

def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond)
end
end
end

module ThreadSafe
class TimeSlidingWindow < Simple::TimeSlidingWindow
def initialize(*)
super
@lock = Mutex.new
end

# #size, #last, and #clear are not wrapped in a mutex. For the first two,
# the worst-case is a thread-switch at a timing where they'd receive an
# out-of-date value--which could happen with a mutex as well.
#
# As for clear, it's an all or nothing operation. Doesn't matter if we
# have the lock or not.

def count(*)
@lock.synchronize { super }
end

def remove_old
@lock.synchronize { super }
end

def push(*)
@lock.synchronize { super }
end
end
end
end