diff --git a/lib/semian.rb b/lib/semian.rb index 7cdd504f..883e1db6 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -8,9 +8,11 @@ require 'semian/platform' require 'semian/resource' require 'semian/circuit_breaker' +require 'semian/error_rate_circuit_breaker' require 'semian/protected_resource' require 'semian/unprotected_resource' require 'semian/simple_sliding_window' +require 'semian/time_sliding_window' require 'semian/simple_integer' require 'semian/simple_state' require 'semian/lru_hash' @@ -126,6 +128,9 @@ def to_s # # +circuit_breaker+: The boolean if you want a circuit breaker acquired for your resource. Default true. # + # +circuit_breaker_type+: The string representing the type of circuit breaker, one of :normal or :error_rate + # Default normal (optional) + # # +bulkhead+: The boolean if you want a bulkhead to be acquired for your resource. Default true. # # +tickets+: Number of tickets. If this value is 0, the ticket count will not be set, @@ -153,6 +158,18 @@ def to_s # +exceptions+: An array of exception classes that should be accounted as resource errors. Default []. # (circuit breaker) # + # +error_percent_threshold+: The percentage of time spent making calls that ultimately ended in error + # that will trigger the circuit opening (error_rate circuit breaker required) + # + # +minimum_request_volume+: The number of calls that must happen within the time_window before the circuit + # will consider opening based on error_percent_threshold. For example, if the value is 20, then if only 19 requests + # are received in the rolling window the circuit will not trip open even if all 19 failed. + # Without this the circuit would open if the first request was an error (100% failure rate). + # (error_rate circuit breaker required) + # + # +time_window+: The time window in seconds over which the error rate will be calculated + # (error_rate circuit breaker required) + # # Returns the registered resource. def register(name, **options) circuit_breaker = create_circuit_breaker(name, **options) @@ -245,10 +262,34 @@ def thread_safe=(thread_safe) private + def create_error_rate_circuit_breaker(name, **options) + require_keys!([:success_threshold, :error_percent_threshold, :error_timeout, + :minimum_request_volume, :time_window], **options) + + exceptions = options[:exceptions] || [] + ErrorRateCircuitBreaker.new(name, + success_threshold: options[:success_threshold], + error_percent_threshold: options[:error_percent_threshold], + error_timeout: options[:error_timeout], + exceptions: Array(exceptions) + [::Semian::BaseError], + half_open_resource_timeout: options[:half_open_resource_timeout], + minimum_request_volume: options[:minimum_request_volume], + time_window: options[:time_window], + implementation: implementation(**options)) + end + def create_circuit_breaker(name, **options) circuit_breaker = options.fetch(:circuit_breaker, true) return unless circuit_breaker - require_keys!([:success_threshold, :error_threshold, :error_timeout], options) + + type = options.fetch(:circuit_breaker_type, :normal) + unless [:normal, :error_rate].include?(type) + raise ArgumentError, "Unknown 'circuit_breaker_type': #{type}, should be :normal or :error_rate" + end + + return create_error_rate_circuit_breaker(name, **options) if type == :error_rate + + require_keys!([:success_threshold, :error_threshold, :error_timeout], **options) exceptions = options[:exceptions] || [] CircuitBreaker.new( diff --git a/lib/semian/error_rate_circuit_breaker.rb b/lib/semian/error_rate_circuit_breaker.rb new file mode 100644 index 00000000..2b460935 --- /dev/null +++ b/lib/semian/error_rate_circuit_breaker.rb @@ -0,0 +1,191 @@ +module Semian + class ErrorRateCircuitBreaker #:nodoc: + extend Forwardable + + def_delegators :@state, :closed?, :open?, :half_open? + + attr_reader :name, :half_open_resource_timeout, :error_timeout, :state, :last_error, :error_percent_threshold, + :minimum_request_volume, :success_threshold, :exceptions + + def_delegator :@window, :time_window_ms + + def initialize(name, exceptions:, error_percent_threshold:, error_timeout:, time_window:, + minimum_request_volume:, success_threshold:, implementation:, + half_open_resource_timeout: nil, time_source: nil) + + raise 'error_threshold_percent should be between 0.0 and 1.0 exclusive' unless 0 < error_percent_threshold && error_percent_threshold < 1 + + @name = name.to_sym + @error_timeout = error_timeout + @exceptions = exceptions + @half_open_resource_timeout = half_open_resource_timeout + @error_percent_threshold = error_percent_threshold + @last_error_time = nil + @minimum_request_volume = minimum_request_volume + @success_threshold = success_threshold + @time_source = time_source ? time_source : -> { Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) } + @window = implementation::TimeSlidingWindow.new(time_window, @time_source) + @state = implementation::State.new + + reset + end + + def acquire(resource = nil, &block) + return yield if disabled? + transition_to_half_open if transition_to_half_open? + + raise OpenCircuitError unless request_allowed? + + time_start = current_time + result = nil + begin + result = maybe_with_half_open_resource_timeout(resource, &block) + rescue *@exceptions => error + if !error.respond_to?(:marks_semian_circuits?) || error.marks_semian_circuits? + mark_failed(error, current_time - time_start) + end + raise error + else + mark_success(current_time - time_start) + end + result + end + + def transition_to_half_open? + open? && error_timeout_expired? && !half_open? + end + + def request_allowed? + closed? || half_open? || transition_to_half_open? + end + + def mark_failed(error, time_spent) + push_error(error, time_spent) + if closed? + transition_to_open if error_threshold_reached? + elsif half_open? + transition_to_open + end + end + + def mark_success(time_spent) + @window << [true, time_spent] + return unless half_open? + transition_to_close if success_threshold_reached? + end + + def reset + @last_error_time = nil + @window.clear + transition_to_close + end + + def destroy + @state.destroy + end + + def in_use? + return false if error_timeout_expired? + error_count > 0 + end + + private + + def current_time + @time_source.call + end + + def transition_to_close + notify_state_transition(:closed) + log_state_transition(:closed) + @state.close! + end + + def transition_to_open + notify_state_transition(:open) + log_state_transition(:open) + @state.open! + @window.clear + end + + def transition_to_half_open + notify_state_transition(:half_open) + log_state_transition(:half_open) + @state.half_open! + @window.clear + end + + def success_threshold_reached? + success_count >= @success_threshold + end + + def error_threshold_reached? + return false if @window.empty? || @window.length < @minimum_request_volume + success_time_spent, error_time_spent = calculate_time_spent + total_time = error_time_spent + success_time_spent + error_time_spent / total_time >= @error_percent_threshold + end + + def calculate_time_spent + @window.each_with_object([0.0, 0.0]) do |entry, sum| + if entry[0] == true + sum[0] = entry[1] + sum[0] + else + sum[1] = entry[1] + sum[1] + end + end + end + + def error_count + @window.count { |entry| entry[0] == false }.to_f + end + + def success_count + @window.count { |entry| entry[0] == true }.to_f + end + + def error_timeout_expired? + return false unless @last_error_time + current_time - @last_error_time >= @error_timeout * 1000 + end + + def push_error(error, time_spent) + @last_error = error + @last_error_time = current_time + @window << [false, time_spent] + end + + def log_state_transition(new_state) + return if @state.nil? || new_state == @state.value + + str = "[#{self.class.name}] State transition from #{@state.value} to #{new_state}." + str << " success_count=#{success_count} error_count=#{error_count}" + str << " success_count_threshold=#{@success_threshold} error_count_percent=#{@error_percent_threshold}" + str << " error_timeout=#{@error_timeout} error_last_at=\"#{@last_error_time}\"" + str << " minimum_request_volume=#{@minimum_request_volume} time_window_ms=#{@window.time_window_ms}" + str << " name=\"#{@name}\"" + Semian.logger.info(str) + end + + def notify_state_transition(new_state) + Semian.notify(:state_change, self, nil, nil, state: new_state) + end + + def disabled? + ENV['SEMIAN_CIRCUIT_BREAKER_DISABLED'] || ENV['SEMIAN_DISABLED'] + end + + def maybe_with_half_open_resource_timeout(resource, &block) + result = + if half_open? && @half_open_resource_timeout && resource.respond_to?(:with_resource_timeout) + resource.with_resource_timeout(@half_open_resource_timeout) do + block.call + end + else + block.call + end + + result + end + end +end diff --git a/lib/semian/time_sliding_window.rb b/lib/semian/time_sliding_window.rb new file mode 100644 index 00000000..e56c0eb2 --- /dev/null +++ b/lib/semian/time_sliding_window.rb @@ -0,0 +1,97 @@ +require 'thread' + +module Semian + module Simple + class TimeSlidingWindow #:nodoc: + extend Forwardable + + def_delegators :@window, :size, :empty?, :length + attr_reader :time_window_ms + + Pair = Struct.new(:head, :tail) + + # A sliding window is a structure that stores the most recent entries that were pushed within the last slice of time + def initialize(time_window, time_source = nil) + @time_window_ms = time_window * 1000 + @time_source = time_source ? time_source : -> { Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) } + @window = [] + end + + def count(&block) + remove_old + vals = @window.map(&:tail) + vals.count(&block) + end + + def each_with_object(memo, &block) + remove_old + vals = @window.map(&:tail) + vals.each_with_object(memo, &block) + end + + def push(value) + remove_old # make room + @window << Pair.new(current_time, value) + self + end + + alias_method :<<, :push + + def clear + @window.clear + self + end + + def last + @window.last&.tail + end + + alias_method :destroy, :clear + + private + + def remove_old + return if @window.empty? + midtime = current_time - time_window_ms + # special case, everything is too old + @window.clear if @window.last.head < midtime + # otherwise we find the index position where the cutoff is + idx = (0...@window.size).bsearch { |n| @window[n].head >= midtime } + @window.slice!(0, idx) if idx + end + + def current_time + @time_source.call + end + end + end + + module ThreadSafe + class TimeSlidingWindow < Simple::TimeSlidingWindow + def initialize(*) + super + @lock = Mutex.new + end + + # #size, #last are not wrapped in a mutex. The worst-case is a + # thread-switch at a timing where they'd receive an + # out-of-date value--which could happen with a mutex as well. + + def count(*) + @lock.synchronize { super } + end + + def each_with_object(*) + @lock.synchronize { super } + end + + def push(*) + @lock.synchronize { super } + end + + def clear + @lock.synchronize { super } + end + end + end +end diff --git a/test/error_rate_circuit_breaker_test.rb b/test/error_rate_circuit_breaker_test.rb new file mode 100644 index 00000000..77c9577d --- /dev/null +++ b/test/error_rate_circuit_breaker_test.rb @@ -0,0 +1,361 @@ +require 'test_helper' + +class TestErrorRateCircuitBreaker < Minitest::Test + include CircuitBreakerHelper + + def setup + @strio = StringIO.new + Semian.logger = Logger.new @strio + begin + Semian.destroy(:testing) + rescue + nil + end + @resource = ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [SomeError], + error_percent_threshold: 0.5, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: nil, + time_source: -> { Time.now.to_f * 1000 }) + Timecop.return + end + + def teardown + Timecop.return + end + + def half_open_circuit(resource = @resource) + Timecop.travel(-1.1) do + open_circuit!(resource) + assert_circuit_opened(resource) + end + assert resource.transition_to_half_open?, 'Expect breaker to be half-open' + end + + def test_error_threshold_must_be_between_0_and_1 + assert_raises RuntimeError do + ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [SomeError], + error_percent_threshold: 1.0, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: nil) + end + + assert_raises RuntimeError do + ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [SomeError], + error_percent_threshold: 0.0, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: nil) + end + end + + def test_acquire_yield_when_the_circuit_is_closed + block_called = false + @resource.acquire { block_called = true } + assert_equal true, block_called + end + + def test_acquire_raises_circuit_open_error_when_the_circuit_is_open + open_circuit! + assert_raises Semian::OpenCircuitError do + @resource.acquire { 1 + 1 } + end + assert_match(/State transition from closed to open/, @strio.string) + assert_match(/minimum_request_volume=2/, @strio.string) + assert_match(/time_window_ms=2000/, @strio.string) + assert_match(/error_count_percent=0\.5/, @strio.string) + end + + def test_after_error_threshold_the_circuit_is_open + open_circuit! + assert_circuit_opened + end + + def test_after_error_timeout_is_elapsed_requests_are_attempted_again + half_open_circuit + assert_circuit_closed + end + + def test_until_success_threshold_is_reached_a_single_error_will_reopen_the_circuit + half_open_circuit + trigger_error! + assert_circuit_opened + end + + def test_once_success_threshold_is_reached_only_error_threshold_will_open_the_circuit_again + half_open_circuit + assert_circuit_closed_elapse_time(@resource, 0.1) # one success + assert_circuit_closed_elapse_time(@resource, 0.1) # two success + trigger_error_elapse_time!(@resource, 0.11) # one failure + trigger_error_elapse_time!(@resource, 0.11) # two failures (>50%) + assert_circuit_opened + end + + def test_reset_allow_to_close_the_circuit_and_forget_errors + open_circuit! + @resource.reset + assert_match(/State transition from open to closed/, @strio.string) + assert_circuit_closed + end + + def test_errors_more_than_duration_apart_doesnt_open_circuit + # allow time for error to slide off time window + Timecop.travel(-2) do + trigger_error! + end + trigger_error! + assert_circuit_closed + end + + def test_errors_under_threshold_doesnt_open_circuit + # 60% success rate + Timecop.travel(-2) do + @resource.acquire do + Timecop.travel(-1) + 1 + 1 + end + + @resource.acquire do + Timecop.return + 1 + 1 + end + end + trigger_error! + trigger_error! + assert_circuit_closed + end + + def test_request_allowed_query_doesnt_trigger_transitions + Timecop.travel(-1.1) do + open_circuit! + refute_predicate @resource, :request_allowed? + assert_predicate @resource, :open? + end + assert_predicate @resource, :request_allowed? + assert_predicate @resource, :open? + end + + def test_open_close_open_cycle + resource = ::Semian::ErrorRateCircuitBreaker.new(:testing, + exceptions: [SomeError], + error_percent_threshold: 0.5, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: nil, + time_source: -> {Time.now.to_f * 1000}) + Timecop.travel(-1.1) do + open_circuit!(resource) + assert_circuit_opened(resource) + end + + assert_circuit_closed(resource) + + assert resource.half_open? + + assert_circuit_closed_elapse_time(resource, 0.1) + + assert resource.closed? + + open_circuit!(resource, 1, 1) + assert_circuit_opened(resource) + + Timecop.travel(1.1) do + assert_circuit_closed(resource) + + assert resource.half_open? + assert_circuit_closed(resource) + + assert resource.closed? + end + end + + def test_env_var_disables_circuit_breaker + ENV['SEMIAN_CIRCUIT_BREAKER_DISABLED'] = '1' + open_circuit! + assert_circuit_closed + ensure + ENV.delete('SEMIAN_CIRCUIT_BREAKER_DISABLED') + end + + def test_semian_wide_env_var_disables_circuit_breaker + ENV['SEMIAN_DISABLED'] = '1' + open_circuit! + assert_circuit_closed + ensure + ENV.delete('SEMIAN_DISABLED') + end + + class RawResource + def timeout + @timeout || 2 + end + + def with_resource_timeout(timeout) + prev_timeout = @timeout + @timeout = timeout + yield + ensure + @timeout = prev_timeout + end + end + + def test_changes_resource_timeout_when_configured + resource = ::Semian::ErrorRateCircuitBreaker.new(:resource_timeout, + exceptions: [SomeError], + error_percent_threshold: 0.5, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: 0.123, + time_source: -> { Time.now.to_f * 1000 }) + + half_open_circuit(resource) + assert_circuit_closed(resource) + assert resource.half_open? + + raw_resource = RawResource.new + + triggered = false + resource.acquire(raw_resource) do + triggered = true + assert_equal 0.123, raw_resource.timeout + end + + assert triggered + assert_equal 2, raw_resource.timeout + end + + def test_doesnt_change_resource_timeout_when_closed + resource = ::Semian::ErrorRateCircuitBreaker.new(:resource_timeout, + exceptions: [SomeError], + error_percent_threshold: 0.5, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: 0.123, + time_source: -> { Time.now.to_f * 1000 }) + + raw_resource = RawResource.new + + triggered = false + resource.acquire(raw_resource) do + triggered = true + assert_equal 2, raw_resource.timeout + end + + assert triggered + assert_equal 2, raw_resource.timeout + end + + def test_doesnt_blow_up_when_configured_half_open_timeout_but_adapter_doesnt_support + resource = ::Semian::ErrorRateCircuitBreaker.new(:resource_timeout, + exceptions: [SomeError], + error_percent_threshold: 0.5, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 2, + half_open_resource_timeout: 0.123, + time_source: -> { Time.now.to_f * 1000 }) + + raw_resource = Object.new + + triggered = false + resource.acquire(raw_resource) do + triggered = true + end + + assert triggered + end + + class SomeErrorThatMarksCircuits < SomeError + def marks_semian_circuits? + true + end + end + + class SomeSubErrorThatDoesNotMarkCircuits < SomeErrorThatMarksCircuits + def marks_semian_circuits? + false + end + end + + def test_opens_circuit_when_error_has_marks_semian_circuits_equal_to_true + 2.times { trigger_error!(@resource, SomeErrorThatMarksCircuits) } + assert_circuit_opened + end + + def test_does_not_open_circuit_when_error_has_marks_semian_circuits_equal_to_false + 2.times { trigger_error!(@resource, SomeSubErrorThatDoesNotMarkCircuits) } + assert_circuit_closed + end + + def test_notify_state_transition + name = :test_notify_state_transition + + events = [] + Semian.subscribe(:test_notify_state_transition) do |event, resource, _scope, _adapter, payload| + if event == :state_change + events << {name: resource.name, state: payload[:state]} + end + end + + # Creating a resource should generate a :closed notification. + resource = ::Semian::ErrorRateCircuitBreaker.new(name, + exceptions: [SomeError], + error_percent_threshold: 0.6666, + error_timeout: 1, + time_window: 2, + minimum_request_volume: 2, + implementation: ::Semian::ThreadSafe, + success_threshold: 1, + half_open_resource_timeout: 0.123, + time_source: -> { Time.now.to_f * 1000 }) + assert_equal(1, events.length) + assert_equal(name, events[0][:name]) + assert_equal(:closed, events[0][:state]) + + # Acquiring a resource doesn't generate a transition. + assert_circuit_closed_elapse_time(resource, 0.1) + assert_equal(1, events.length) + + # error_threshold_percent failures causes a transition to open. + 2.times { trigger_error_elapse_time!(resource, 0.12) } + assert_equal(2, events.length) + assert_equal(name, events[1][:name]) + assert_equal(:open, events[1][:state]) + + Timecop.travel(1.1) do + # Acquiring the resource successfully generates a transition to half_open, then closed. + resource.acquire { nil } + assert_equal(4, events.length) + assert_equal(name, events[2][:name]) + assert_equal(:half_open, events[2][:state]) + assert_equal(name, events[3][:name]) + assert_equal(:closed, events[3][:state]) + end + ensure + Semian.unsubscribe(:test_notify_state_transition) + end +end diff --git a/test/helpers/circuit_breaker_helper.rb b/test/helpers/circuit_breaker_helper.rb index 7d86ae0d..644ad468 100644 --- a/test/helpers/circuit_breaker_helper.rb +++ b/test/helpers/circuit_breaker_helper.rb @@ -3,8 +3,12 @@ module CircuitBreakerHelper private - def open_circuit!(resource = @resource, error_count = 2) - error_count.times { trigger_error!(resource) } + def open_circuit!(resource = @resource, error_count = 2, elapsed_time = nil) + if elapsed_time == nil + error_count.times { trigger_error!(resource) } + else + error_count.times { trigger_error_elapse_time!(resource, elapsed_time) } + end end def half_open_cicuit!(resource = @resource, backwards_time_travel = 10) @@ -18,6 +22,29 @@ def trigger_error!(resource = @resource, error = SomeError) rescue error end + def trigger_error_elapse_time!(resource = @resource, error = SomeError, elapsed_time) + Timecop.travel(-1 * elapsed_time) do + begin + resource.acquire { + Timecop.return_to_baseline + raise error + } + rescue error + end + end + end + + def assert_circuit_closed_elapse_time(resource = @resource, elapsed_time) + block_called = false + Timecop.travel(-1 * elapsed_time) do + resource.acquire do + Timecop.return_to_baseline + block_called = true + end + end + assert block_called, 'Expected the circuit to be closed, but it was open' + end + def assert_circuit_closed(resource = @resource) block_called = false resource.acquire { block_called = true } diff --git a/test/semian_test.rb b/test/semian_test.rb index 65d6958f..58856b50 100644 --- a/test/semian_test.rb +++ b/test/semian_test.rb @@ -68,6 +68,25 @@ def test_register_with_bulkhead_missing_options assert_equal exception.message, "Must pass exactly one of ticket or quota" end + def test_register_with_error_rate_circuitbreaker + resource = Semian.register( + :testing_error_rate, + circuit_breaker_type: :error_rate, + success_threshold: 1, + error_percent_threshold: 0.2, + minimum_request_volume: 1, + time_window: 10, + error_timeout: 5, + circuit_breaker: true, + bulkhead: false, + thread_safety_disabled: false, + ) + + assert resource, Semian[:testing_error_rate] + assert resource.circuit_breaker.state.instance_of?(Semian::ThreadSafe::State) + assert resource.circuit_breaker.instance_of?(Semian::ErrorRateCircuitBreaker) + end + def test_unsuported_constants assert defined?(Semian::BaseError) assert defined?(Semian::SyscallError) diff --git a/test/time_sliding_window_test.rb b/test/time_sliding_window_test.rb new file mode 100644 index 00000000..254c424c --- /dev/null +++ b/test/time_sliding_window_test.rb @@ -0,0 +1,75 @@ +require 'test_helper' + +class TestTimeSlidingWindow < Minitest::Test + def setup + @sliding_window = ::Semian::ThreadSafe::TimeSlidingWindow.new(0.5, -> { Time.now.to_f * 1000 }) # Timecop doesn't work with a monotonic clock + @sliding_window.clear + Timecop.freeze + end + + def teardown + @sliding_window.destroy + Timecop.return + end + + def test_sliding_window_push + assert_equal(0, @sliding_window.size) + @sliding_window << 1 + assert_sliding_window(@sliding_window, [1], 500) + @sliding_window << 5 + assert_sliding_window(@sliding_window, [1, 5], 500) + end + + def test_special_everything_too_old + @sliding_window << 0 << 1 + Timecop.travel(0.501) do + assert_sliding_window(@sliding_window, [], 500) + end + end + + def test_sliding_window_edge_falloff + assert_equal(0, @sliding_window.size) + @sliding_window << 0 << 1 << 2 << 3 << 4 << 5 << 6 << 7 + assert_sliding_window(@sliding_window, [0, 1, 2, 3, 4, 5, 6, 7], 500) + Timecop.travel(0.251) do + @sliding_window << 8 << 9 << 10 + end + + Timecop.travel(0.251 * 2) do + assert_sliding_window(@sliding_window, [8, 9, 10], 500) + @sliding_window << 11 + end + Timecop.travel(0.251 * 3) do + assert_sliding_window(@sliding_window, [11], 500) + end + end + + def test_sliding_window_count + @sliding_window << true << false << true << false << true << true << true + assert_equal(5, @sliding_window.count { |e| e == true }) + assert_equal(2, @sliding_window.count { |e| e == false }) + end + + def test_each_with_object + assert_equal(0, @sliding_window.size) + @sliding_window << [false, 1] << [false, 2] << [true, 1] << [true, 3] + result = @sliding_window.each_with_object([0.0, 0.0]) do |entry, sum| + if entry[0] == true + sum[0] = entry[1] + sum[0] + else + sum[1] = entry[1] + sum[1] + end + end + + assert_equal([4.0, 3.0], result) + end + + private + + def assert_sliding_window(sliding_window, array, time_window_millis) + # each_with_object will remove old entries first + data = sliding_window.each_with_object([]) { |v, data| data.append(v) } + assert_equal(array, data) + assert_equal(time_window_millis, sliding_window.time_window_ms) + end +end