From 2c6a1a9b44631f3dd842af1348ec78d19463e6c0 Mon Sep 17 00:00:00 2001 From: Ernesto Barajas Date: Wed, 29 Oct 2025 12:38:10 -0500 Subject: [PATCH 1/4] wip --- ruby/lib/ci/queue.rb | 22 -- ruby/lib/ci/queue/common.rb | 22 ++ ruby/lib/ci/queue/redis.rb | 1 + ruby/lib/ci/queue/redis/moving_average.rb | 65 +++++ ruby/lib/ci/queue/redis/test_time_record.rb | 6 + ruby/lib/ci/queue/redis/worker.rb | 2 +- ruby/lib/ci/queue/strategy/base.rb | 10 +- ruby/lib/ci/queue/strategy/random.rb | 4 +- .../ci/queue/strategy/suite_bin_packing.rb | 76 +++--- ruby/lib/ci/queue/strategy/timing_based.rb | 6 +- .../ci/queue/redis/dynamic_timeout_test.rb | 28 +-- .../ci/queue/redis/moving_average_test.rb | 166 ++++++++++++ ruby/test/ci/queue/redis/worker_chunk_test.rb | 14 +- ruby/test/ci/queue/redis_test.rb | 236 +++++++++++++++++- .../queue/strategy/suite_bin_packing_test.rb | 22 +- 15 files changed, 588 insertions(+), 92 deletions(-) create mode 100644 ruby/lib/ci/queue/redis/moving_average.rb create mode 100644 ruby/test/ci/queue/redis/moving_average_test.rb diff --git a/ruby/lib/ci/queue.rb b/ruby/lib/ci/queue.rb index ed17ce58..a38aaf61 100644 --- a/ruby/lib/ci/queue.rb +++ b/ruby/lib/ci/queue.rb @@ -42,15 +42,6 @@ def requeueable?(test_result) requeueable.nil? || requeueable.call(test_result) end - def shuffle(tests, random, config: nil) - if shuffler - shuffler.call(tests, random) - else - strategy = get_strategy(config&.strategy) - strategy.order_tests(tests, random: random, config: config) - end - end - def from_uri(url, config) uri = URI(url) implementation = case uri.scheme @@ -66,18 +57,5 @@ def from_uri(url, config) end implementation.from_uri(uri, config) end - - private - - def get_strategy(strategy_name) - case strategy_name&.to_sym - when :timing_based - Strategy::TimingBased.new - when :suite_bin_packing - Strategy::SuiteBinPacking.new - else - Strategy::Random.new - end - end end end diff --git a/ruby/lib/ci/queue/common.rb b/ruby/lib/ci/queue/common.rb index da41fe21..816db6fe 100644 --- a/ruby/lib/ci/queue/common.rb +++ b/ruby/lib/ci/queue/common.rb @@ -36,6 +36,28 @@ def rescue_connection_errors(handler = ->(err) { nil }) rescue *self::class::CONNECTION_ERRORS => err handler.call(err) end + + def ordering_strategy + case config.strategy.to_sym + when :timing_based + Strategy::TimingBased.new(config) + when :suite_bin_packing + # pass redis if available + # need to think about a better way to structure queue/strategy interaction + redis_instance = if self.respond_to?(:redis, true) # include private methods + self.send(:redis) + else + nil + end + Strategy::SuiteBinPacking.new(config, redis: redis_instance) + else + Strategy::Random.new(config) + end + end + + def reorder_tests(tests, random: Random.new) + ordering_strategy.order_tests(tests, random: random) + end end end end diff --git a/ruby/lib/ci/queue/redis.rb b/ruby/lib/ci/queue/redis.rb index 526c4aed..3642a117 100644 --- a/ruby/lib/ci/queue/redis.rb +++ b/ruby/lib/ci/queue/redis.rb @@ -10,6 +10,7 @@ require 'ci/queue/redis/supervisor' require 'ci/queue/redis/grind_supervisor' require 'ci/queue/redis/test_time_record' +require 'ci/queue/redis/moving_average' module CI module Queue diff --git a/ruby/lib/ci/queue/redis/moving_average.rb b/ruby/lib/ci/queue/redis/moving_average.rb new file mode 100644 index 00000000..501bb8bb --- /dev/null +++ b/ruby/lib/ci/queue/redis/moving_average.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +module CI + module Queue + module Redis + # Represents a redis hash of moving averages for test durations + # + # Moving average is calculated using exponential moving average formula + class MovingAverage + LUA_SCRIPT= <<~LUA + local hash_key = KEYS[1] + local test_id = ARGV[1] + local new_duration = tonumber(ARGV[2]) + local smoothing = tonumber(ARGV[3]) + local current_avg = redis.call('HGET', hash_key, test_id) + if current_avg then + current_avg = tonumber(current_avg) + local new_avg = smoothing * new_duration + (1 - smoothing) * current_avg + redis.call('HSET', hash_key, test_id, new_avg) + return tostring(new_avg) + else + redis.call('HSET', hash_key, test_id, new_duration) + return tostring(new_duration) + end + LUA + + def initialize(redis, key: "test_duration_moving_averages", smoothing_factor: 0.2) + @redis = redis + @key = key + @smoothing_factor = smoothing_factor + @values = {} + end + + def [](test_id) + load_all if @values.empty? + @values[test_id] + end + + def update(test_id, duration) + new_avg = @redis.eval(LUA_SCRIPT, keys: [@key], argv: [test_id, duration, @smoothing_factor]) + @values[test_id] = new_avg.to_f + new_avg.to_f + end + + def load_all + batch_size = 1000 + cursor = '0' + @values = {} + + loop do + cursor, batch = @redis.hscan(@key, cursor, count: batch_size) + batch.each do |test_id, value| + @values[test_id] = value.to_f + end + break if cursor == '0' + end + end + + def size + @redis.hlen(@key) + end + end + end + end +end diff --git a/ruby/lib/ci/queue/redis/test_time_record.rb b/ruby/lib/ci/queue/redis/test_time_record.rb index 439b1b3f..a45dd2b7 100644 --- a/ruby/lib/ci/queue/redis/test_time_record.rb +++ b/ruby/lib/ci/queue/redis/test_time_record.rb @@ -5,7 +5,9 @@ module Redis class TestTimeRecord < Worker def record(test_name, duration) record_test_time(test_name, duration) + record_test_duration_moving_average(test_name, duration) record_test_name(test_name) + end def fetch @@ -29,6 +31,10 @@ def record_test_time(test_name, duration) nil end + def record_test_duration_moving_average(test_name, duration) + MovingAverage.new(redis).update(test_name, duration) + end + def record_test_name(test_name) redis.pipelined do |pipeline| pipeline.lpush( diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 454db50b..119777a4 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -30,7 +30,7 @@ def distributed? def populate(tests, random: Random.new) @index = tests.map { |t| [t.id, t] }.to_h - executables = Queue.shuffle(tests, random, config: config) + executables = reorder_tests(tests, random: random) # Separate chunks from individual tests chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } diff --git a/ruby/lib/ci/queue/strategy/base.rb b/ruby/lib/ci/queue/strategy/base.rb index 98b3cf40..433c9c52 100644 --- a/ruby/lib/ci/queue/strategy/base.rb +++ b/ruby/lib/ci/queue/strategy/base.rb @@ -4,10 +4,16 @@ module CI module Queue module Strategy class Base - def order_tests(tests, random: Random.new, config: nil) + def initialize(config) + @config = config + end + + attr_reader :config + + def order_tests(tests) raise NotImplementedError, "#{self.class} must implement #order_tests" end end end end -end \ No newline at end of file +end diff --git a/ruby/lib/ci/queue/strategy/random.rb b/ruby/lib/ci/queue/strategy/random.rb index bf8ca728..0e9ff4c9 100644 --- a/ruby/lib/ci/queue/strategy/random.rb +++ b/ruby/lib/ci/queue/strategy/random.rb @@ -5,10 +5,10 @@ module CI module Queue module Strategy class Random < Base - def order_tests(tests, random: Random.new, config: nil) + def order_tests(tests, random: Random.new) tests.sort.shuffle(random: random) end end end end -end \ No newline at end of file +end diff --git a/ruby/lib/ci/queue/strategy/suite_bin_packing.rb b/ruby/lib/ci/queue/strategy/suite_bin_packing.rb index e07fa669..ff16788d 100644 --- a/ruby/lib/ci/queue/strategy/suite_bin_packing.rb +++ b/ruby/lib/ci/queue/strategy/suite_bin_packing.rb @@ -6,12 +6,36 @@ module CI module Queue module Strategy class SuiteBinPacking < Base - def order_tests(tests, random: Random.new, config: nil) - timing_data = load_timing_data(config&.timing_file) - max_duration = config&.suite_max_duration || 120_000 - fallback_duration = config&.timing_fallback_duration || 100.0 - buffer_percent = config&.suite_buffer_percent || 10 + class << self + def load_timing_data(file_path) + return {} unless file_path && ::File.exist?(file_path) + + JSON.parse(::File.read(file_path)) + rescue JSON::ParserError => e + warn "Warning: Could not parse timing file #{file_path}: #{e.message}" + {} + end + end + + def initialize(config, redis: nil) + super(config) + + if redis + @moving_average = CI::Queue::Redis::MovingAverage.new(redis) + end + + if config&.timing_file + @timing_data = self.class.load_timing_data(config.timing_file) + else + @timing_data = {} + end + @max_duration = config&.suite_max_duration || 120_000 + @fallback_duration = config&.timing_fallback_duration || 100.0 + @buffer_percent = config&.suite_buffer_percent || 10 + end + + def order_tests(tests, random: ::Random.new, redis: nil) # Group tests by suite name suites = tests.group_by { |test| extract_suite_name(test.id) } @@ -22,10 +46,6 @@ def order_tests(tests, random: Random.new, config: nil) create_chunks_for_suite( suite_name, suite_tests, - max_duration, - buffer_percent, - timing_data, - fallback_duration ) ) end @@ -40,27 +60,27 @@ def extract_suite_name(test_id) test_id.split('#').first end - def load_timing_data(file_path) - return {} unless file_path && ::File.exist?(file_path) - - JSON.parse(::File.read(file_path)) - rescue JSON::ParserError => e - warn "Warning: Could not parse timing file #{file_path}: #{e.message}" - {} - end + def get_test_duration(test_id) + if @moving_average + avg = @moving_average[test_id] + return avg if avg + end - def get_test_duration(test_id, timing_data, fallback_duration) - timing_data[test_id]&.to_f || fallback_duration + if @timing_data.key?(test_id) + @timing_data[test_id] + else + @fallback_duration + end end - def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration) + def create_chunks_for_suite(suite_name, suite_tests) # Calculate total suite duration total_duration = suite_tests.sum do |test| - get_test_duration(test.id, timing_data, fallback_duration) + get_test_duration(test.id) end # If suite fits in max duration, create full_suite chunk - if total_duration <= max_duration + if total_duration <= @max_duration chunk_id = "#{suite_name}:full_suite" # Don't store test_ids in Redis - worker will resolve from index # But pass test_count for timeout calculation @@ -71,20 +91,16 @@ def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percen split_suite_into_chunks( suite_name, suite_tests, - max_duration, - buffer_percent, - timing_data, - fallback_duration ) end - def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration) + def split_suite_into_chunks(suite_name, suite_tests) # Apply buffer to max duration - effective_max = max_duration * (1 - buffer_percent / 100.0) + effective_max = @max_duration * (1 - @buffer_percent / 100.0) # Sort tests by duration (longest first for better bin packing) sorted_tests = suite_tests.sort_by do |test| - -get_test_duration(test.id, timing_data, fallback_duration) + -get_test_duration(test.id) end # First-fit decreasing bin packing @@ -94,7 +110,7 @@ def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percen chunk_index = 0 sorted_tests.each do |test| - test_duration = get_test_duration(test.id, timing_data, fallback_duration) + test_duration = get_test_duration(test.id) if current_chunk_duration + test_duration > effective_max && current_chunk_tests.any? # Finalize current chunk and start new one diff --git a/ruby/lib/ci/queue/strategy/timing_based.rb b/ruby/lib/ci/queue/strategy/timing_based.rb index 4a2b13bb..664350f0 100644 --- a/ruby/lib/ci/queue/strategy/timing_based.rb +++ b/ruby/lib/ci/queue/strategy/timing_based.rb @@ -6,7 +6,7 @@ module CI module Queue module Strategy class TimingBased < Base - def order_tests(tests, random: Random.new, config: nil) + def order_tests(tests, random: Random.new) timing_data = load_timing_data(config&.timing_file) fallback_duration = config&.timing_fallback_duration || 100.0 @@ -20,7 +20,7 @@ def order_tests(tests, random: Random.new, config: nil) def load_timing_data(file_path) return {} unless file_path && ::File.exist?(file_path) - + JSON.parse(::File.read(file_path)) rescue JSON::ParserError => e warn "Warning: Could not parse timing file #{file_path}: #{e.message}" @@ -32,4 +32,4 @@ def load_timing_data(file_path) end end end -end \ No newline at end of file +end diff --git a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb index f8770192..3edbab64 100644 --- a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb +++ b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb @@ -29,7 +29,7 @@ def test_chunk_timeout_stored_in_redis_hash CI::Queue::TestChunk.new('TestA:full_suite', 'TestA', :full_suite, [], 5000.0, test_count: 3) ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -45,8 +45,8 @@ def test_chunk_timeout_scales_with_test_count small_tests = create_mock_tests((1..5).map { |i| "SmallSuite#test_#{i}" }) small_chunk = CI::Queue::TestChunk.new('SmallSuite:full_suite', 'SmallSuite', :full_suite, [], 1000.0, test_count: 5) - CI::Queue.stub(:shuffle, [small_chunk]) do - worker = CI::Queue::Redis.new(@redis_url, @config) + worker = CI::Queue::Redis.new(@redis_url, @config) + worker.stub(:reorder_tests, [small_chunk]) do worker.populate(small_tests) end @@ -59,8 +59,8 @@ def test_chunk_timeout_scales_with_test_count large_tests = create_mock_tests((1..20).map { |i| "LargeSuite#test_#{i}" }) large_chunk = CI::Queue::TestChunk.new('LargeSuite:full_suite', 'LargeSuite', :full_suite, [], 5000.0, test_count: 20) - CI::Queue.stub(:shuffle, [large_chunk]) do - worker = CI::Queue::Redis.new(@redis_url, @config) + worker = CI::Queue::Redis.new(@redis_url, @config) + worker.stub(:reorder_tests, [large_chunk]) do worker.populate(large_tests) end @@ -81,7 +81,7 @@ def test_multiple_chunks_stored_with_different_timeouts CI::Queue::TestChunk.new('TestC:full_suite', 'TestC', :full_suite, [], 1000.0, test_count: 1), ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -97,7 +97,7 @@ def test_timeout_hash_has_ttl CI::Queue::TestChunk.new('TestA:full_suite', 'TestA', :full_suite, [], 1000.0, test_count: 2) ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -110,7 +110,7 @@ def test_single_test_not_in_timeout_hash tests = create_mock_tests(['TestA#test_1', 'TestB#test_1']) # Return individual tests, not chunks - CI::Queue.stub(:shuffle, tests) do + @worker.stub(:reorder_tests, tests) do @worker.populate(tests) end @@ -132,7 +132,7 @@ def test_mixed_chunks_and_tests_only_chunks_have_timeouts CI::Queue::TestChunk.new('TestC:full_suite', 'TestC', :full_suite, [], 3000.0, test_count: 3), ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -147,7 +147,7 @@ def test_mixed_chunks_and_tests_only_chunks_have_timeouts def test_reserve_test_passes_dynamic_deadline_flag tests = create_mock_tests(['TestA#test_1']) - CI::Queue.stub(:shuffle, tests) do + @worker.stub(:reorder_tests, tests) do @worker.populate(tests) end @@ -177,7 +177,7 @@ def test_reserve_test_passes_dynamic_deadline_flag def test_reserve_lost_test_passes_dynamic_deadline_flag tests = create_mock_tests(['TestA#test_1']) - CI::Queue.stub(:shuffle, tests) do + @worker.stub(:reorder_tests, tests) do @worker.populate(tests) end @@ -218,7 +218,7 @@ def test_chunk_not_marked_lost_before_dynamic_timeout tests = create_mock_tests((1..5).map { |i| "TestSuite#test_#{i}" }) chunk = CI::Queue::TestChunk.new('TestSuite:full_suite', 'TestSuite', :full_suite, [], 5000.0, test_count: 5) - CI::Queue.stub(:shuffle, [chunk]) do + worker1.stub(:reorder_tests, [chunk]) do worker1.populate(tests) end @@ -251,7 +251,7 @@ def test_single_test_marked_lost_after_default_timeout # Populate with single test (no chunk) tests = create_mock_tests(['TestA#test_1']) - CI::Queue.stub(:shuffle, tests) do + worker1.stub(:reorder_tests, tests) do worker1.populate(tests) end @@ -278,7 +278,7 @@ def test_batching_with_many_chunks CI::Queue::TestChunk.new("TestSuite#{i}:full_suite", "TestSuite#{i}", :full_suite, [], 1000.0, test_count: 1) end - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end diff --git a/ruby/test/ci/queue/redis/moving_average_test.rb b/ruby/test/ci/queue/redis/moving_average_test.rb new file mode 100644 index 00000000..d2f1ef5b --- /dev/null +++ b/ruby/test/ci/queue/redis/moving_average_test.rb @@ -0,0 +1,166 @@ +# frozen_string_literal: true +require 'test_helper' + +class CI::Queue::Redis::MovingAverageTest < Minitest::Test + def setup + @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') + @redis = Redis.new(url: @redis_url) + @redis.flushdb + @key = 'test:moving_averages' + end + + def teardown + @redis.flushdb + @redis.close + end + + def test_initialize + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + assert_equal 0, ma.size + end + + def test_initialize_with_custom_smoothing_factor + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.5) + ma.update('test1', 100.0) + first_avg = ma['test1'] + assert_equal 100.0, first_avg + + ma.update('test1', 200.0) + second_avg = ma['test1'] + assert_in_delta 150.0, second_avg, 0.001 + end + + def test_update_creates_new_entry + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + result = ma.update('test1', 10.5) + + assert_equal 10.5, result + assert_equal 1, ma.size + end + + def test_update_calculates_exponential_moving_average + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) + + first_avg = ma.update('test1', 100.0) + assert_equal 100.0, first_avg + + # EMA = 0.2 * 150 + 0.8 * 100 = 30 + 80 = 110 + second_avg = ma.update('test1', 150.0) + assert_in_delta 110.0, second_avg, 0.001 + + third_avg = ma.update('test1', 200.0) + assert_in_delta 128.0, third_avg, 0.001 + end + + def test_update_multiple_tests + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + + ma.update('test1', 10.0) + ma.update('test2', 20.0) + ma.update('test3', 30.0) + + assert_equal 3, ma.size + end + + def test_bracket_operator_loads_and_returns_value + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + ma.update('test1', 10.5) + ma.update('test2', 20.5) + + ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + assert_equal 10.5, ma2['test1'] + assert_equal 20.5, ma2['test2'] + end + + def test_bracket_operator_returns_nil_for_missing_key + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + ma.update('test1', 10.5) + + assert_nil ma['nonexistent'] + end + + def test_load_all_loads_all_values_from_redis + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + ma.update('test1', 10.0) + ma.update('test2', 20.0) + ma.update('test3', 30.0) + + ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + ma2.load_all + + assert_equal 10.0, ma2['test1'] + assert_equal 20.0, ma2['test2'] + assert_equal 30.0, ma2['test3'] + end + + def test_load_all_handles_batches + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + + 1500.times do |i| + ma.update("test_#{i}", i.to_f) + end + + # Create new instance and load all + ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + ma2.load_all + + assert_equal 1500, ma2.size + assert_equal 0.0, ma2['test_0'] + assert_equal 500.0, ma2['test_500'] + assert_equal 1499.0, ma2['test_1499'] + end + + def test_size_returns_correct_count + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + + assert_equal 0, ma.size + + ma.update('test1', 10.0) + assert_equal 1, ma.size + + ma.update('test2', 20.0) + assert_equal 2, ma.size + + ma.update('test1', 15.0) + assert_equal 2, ma.size + end + + def test_updates_persist_to_redis + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + ma.update('test1', 10.5) + ma.update('test2', 20.5) + + # Verify data is actually in Redis + values = @redis.hgetall(@key) + assert_equal 2, values.size + assert_equal '10.5', values['test1'] + assert_equal '20.5', values['test2'] + end + + def test_concurrent_updates_from_different_instances + ma1 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) + ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) + + ma1.update('test1', 100.0) + ma2.update('test1', 200.0) + + # Expected: 0.2 * 200 + 0.8 * 100 = 120 + assert_in_delta 120.0, ma2['test1'], 0.001 + end + + def test_handles_floating_point_precision + ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + + # Test with various floating point values + ma.update('test1', 0.123456789) + ma.update('test2', 999.999999) + ma.update('test3', 0.000001) + + # Load in new instance + ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + + assert_in_delta 0.123456789, ma2['test1'], 0.000001 + assert_in_delta 999.999999, ma2['test2'], 0.000001 + assert_in_delta 0.000001, ma2['test3'], 0.000001 + end +end diff --git a/ruby/test/ci/queue/redis/worker_chunk_test.rb b/ruby/test/ci/queue/redis/worker_chunk_test.rb index e9e383f5..62152a63 100644 --- a/ruby/test/ci/queue/redis/worker_chunk_test.rb +++ b/ruby/test/ci/queue/redis/worker_chunk_test.rb @@ -30,7 +30,7 @@ def test_populate_stores_chunk_metadata_in_redis ] # Simulate strategy returning chunks - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -51,7 +51,7 @@ def test_populate_stores_partial_suite_with_test_ids CI::Queue::TestChunk.new('TestA:chunk_0', 'TestA', :partial_suite, test_ids, 3000.0) ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -87,7 +87,7 @@ def test_resolve_full_suite_chunk CI::Queue::TestChunk.new('TestA:full_suite', 'TestA', :full_suite, [], 3000.0) ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -108,7 +108,7 @@ def test_resolve_partial_suite_chunk CI::Queue::TestChunk.new('TestA:chunk_0', 'TestA', :partial_suite, test_ids, 2000.0) ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -178,7 +178,7 @@ def test_populate_with_mixed_chunks_and_tests tests[1] # Individual test ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -194,7 +194,7 @@ def test_chunk_metadata_has_ttl CI::Queue::TestChunk.new('TestA:full_suite', 'TestA', :full_suite, [], 1000.0) ] - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end @@ -209,7 +209,7 @@ def test_populate_with_many_chunks_uses_batching CI::Queue::TestChunk.new("TestSuite#{i}:full_suite", "TestSuite#{i}", :full_suite, [], 1000.0) end - CI::Queue.stub(:shuffle, chunks) do + @worker.stub(:reorder_tests, chunks) do @worker.populate(tests) end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index e48cb7c3..c3b4cf2c 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -12,6 +12,10 @@ def setup @config = @queue.send(:config) # hack end + def teardown + @redis.flushdb + end + def test_from_uri second_queue = populate( CI::Queue.from_uri(@redis_url, config) @@ -260,7 +264,7 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker worker1 = worker(1, tests: tests, build_id: '100', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0) worker2 = worker(2, tests: tests, build_id: '100', strategy: :suite_bin_packing, - suite_max_duration: 120_000, timing_fallback_duration: 100.0) + suite_max_duration: 120_000, timing_fallback_duration: 100.0, populate: false) acquired = false worker2_tried = false @@ -428,6 +432,234 @@ def test_individual_test_uses_default_timeout_after_requeue assert_equal reserved_test.id, stolen_test.id end + def test_suite_bin_packing_uses_moving_average_for_duration + @redis.flushdb + + moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) + moving_avg.update('TestSuite#test_1', 5000.0) + moving_avg.update('TestSuite#test_2', 3000.0) + + tests = [ + MockTest.new('TestSuite#test_1'), + MockTest.new('TestSuite#test_2') + ] + + worker = worker(1, tests: tests, build_id: '200', strategy: :suite_bin_packing, + suite_max_duration: 120_000, timing_fallback_duration: 100.0) + + chunks = [] + worker.poll do |chunk| + chunks << chunk + worker.acknowledge(chunk) + end + + assert_equal 1, chunks.size + chunk = chunks.first + assert chunk.chunk?, "Expected a chunk" + assert_equal 'TestSuite:full_suite', chunk.id + assert_equal 8000.0, chunk.estimated_duration + end + + def test_moving_average_takes_precedence_over_timing_file + @redis.flushdb + require 'tempfile' + + timing_data = { 'TestSuite#test_1' => 10_000.0 } + + moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) + moving_avg.update('TestSuite#test_1', 2000.0) + + tests = [MockTest.new('TestSuite#test_1')] + + Tempfile.open(['timing', '.json']) do |file| + file.write(JSON.generate(timing_data)) + file.close + + worker = worker(1, tests: tests, build_id: '201', strategy: :suite_bin_packing, + suite_max_duration: 120_000, timing_fallback_duration: 100.0, + timing_file: file.path) + + chunks = [] + worker.poll do |chunk| + chunks << chunk + worker.acknowledge(chunk) + end + + assert_equal 1, chunks.size + assert_equal 2000.0, chunks.first.estimated_duration + end + end + + def test_falls_back_to_timing_file_when_no_moving_average + @redis.flushdb + require 'tempfile' + + timing_data = { 'TestSuite#test_1' => 7000.0 } + tests = [MockTest.new('TestSuite#test_1')] + + Tempfile.open(['timing', '.json']) do |file| + file.write(JSON.generate(timing_data)) + file.close + + worker = worker(1, tests: tests, build_id: '202', strategy: :suite_bin_packing, + suite_max_duration: 120_000, timing_fallback_duration: 100.0, + timing_file: file.path) + + chunks = [] + worker.poll do |chunk| + chunks << chunk + worker.acknowledge(chunk) + end + + assert_equal 1, chunks.size + assert_equal 7000.0, chunks.first.estimated_duration + end + end + + def test_falls_back_to_default_when_no_moving_average_or_timing_data + @redis.flushdb + + tests = [MockTest.new('UnknownTest#test_1')] + + worker = worker(1, tests: tests, build_id: '203', strategy: :suite_bin_packing, + suite_max_duration: 120_000, timing_fallback_duration: 500.0) + + chunks = [] + worker.poll do |chunk| + chunks << chunk + worker.acknowledge(chunk) + end + + assert_equal 1, chunks.size + assert_equal 500.0, chunks.first.estimated_duration + end + + def test_mixed_duration_sources_in_suite_splitting + @redis.flushdb + require 'tempfile' + + moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) + moving_avg.update('MixedTest#test_1', 60_000.0) + moving_avg.update('MixedTest#test_2', 50_000.0) + + timing_data = { + 'MixedTest#test_3' => 40_000.0, + 'MixedTest#test_4' => 30_000.0 + } + + tests = [ + MockTest.new('MixedTest#test_1'), + MockTest.new('MixedTest#test_2'), + MockTest.new('MixedTest#test_3'), + MockTest.new('MixedTest#test_4') + ] + + Tempfile.open(['timing', '.json']) do |file| + file.write(JSON.generate(timing_data)) + file.close + + worker = worker(1, tests: tests, build_id: '204', strategy: :suite_bin_packing, + suite_max_duration: 120_000, suite_buffer_percent: 10, + timing_fallback_duration: 100.0, timing_file: file.path) + + chunks = [] + worker.poll do |chunk| + chunks << chunk + worker.acknowledge(chunk) + end + + assert chunks.size >= 2 + + effective_max = 120_000 * (1 - 10 / 100.0) + chunks.each do |chunk| + assert chunk.estimated_duration <= effective_max, + "Chunk duration #{chunk.estimated_duration} exceeds effective max #{effective_max}" + end + end + end + + def test_moving_average_ordering_by_duration + @redis.flushdb + + moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) + moving_avg.update('FastTest#test_1', 1000.0) + moving_avg.update('SlowTest#test_1', 10_000.0) + moving_avg.update('MediumTest#test_1', 5000.0) + + tests = [ + MockTest.new('FastTest#test_1'), + MockTest.new('SlowTest#test_1'), + MockTest.new('MediumTest#test_1') + ] + + worker = worker(1, tests: tests, build_id: '205', strategy: :suite_bin_packing, + suite_max_duration: 120_000, timing_fallback_duration: 100.0) + + chunks = [] + worker.poll do |chunk| + chunks << chunk + worker.acknowledge(chunk) + end + + # Should be ordered by duration descending: SlowTest, MediumTest, FastTest + assert_equal 3, chunks.size + assert_equal 'SlowTest:full_suite', chunks[0].id + assert_equal 10_000.0, chunks[0].estimated_duration + assert_equal 'MediumTest:full_suite', chunks[1].id + assert_equal 5000.0, chunks[1].estimated_duration + assert_equal 'FastTest:full_suite', chunks[2].id + assert_equal 1000.0, chunks[2].estimated_duration + end + + def test_moving_average_with_partial_coverage + @redis.flushdb + + # Only one test has moving average data + moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) + moving_avg.update('PartialTest#test_1', 3000.0) + + tests = [ + MockTest.new('PartialTest#test_1'), + MockTest.new('PartialTest#test_2'), + MockTest.new('PartialTest#test_3') + ] + + worker = worker(1, tests: tests, build_id: '206', strategy: :suite_bin_packing, + suite_max_duration: 120_000, timing_fallback_duration: 500.0) + + chunks = [] + worker.poll do |chunk| + chunks << chunk + worker.acknowledge(chunk) + end + + assert_equal 1, chunks.size + assert_equal 4000.0, chunks.first.estimated_duration + end + + def test_moving_average_updates_persist_across_workers + @redis.flushdb + + # Manually update moving average as if a previous worker completed the test + moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) + moving_avg.update('PersistTest#test_1', 5500.0) + + # New worker should see the persisted moving average + tests = [MockTest.new('PersistTest#test_1')] + worker1 = worker(1, tests: tests, build_id: '207', strategy: :suite_bin_packing, + suite_max_duration: 120_000, timing_fallback_duration: 1000.0) + + chunks = [] + worker1.poll do |chunk| + chunks << chunk + worker1.acknowledge(chunk) + end + + # Should use the persisted moving average value + assert_equal 1, chunks.size + assert_equal 5500.0, chunks.first.estimated_duration + end + private class MockTest @@ -451,7 +683,7 @@ def tests end def shuffled_test_list - CI::Queue.shuffle(TEST_LIST, Random.new(0)).freeze + TEST_LIST.sort.shuffle(random: Random.new(0)).freeze end def build_queue diff --git a/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb b/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb index fcd97fde..b5ae7993 100644 --- a/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb +++ b/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb @@ -2,14 +2,16 @@ require 'test_helper' require 'tempfile' +require 'minitest/focus' + class SuiteBinPackingTest < Minitest::Test def setup - @strategy = CI::Queue::Strategy::SuiteBinPacking.new @config = CI::Queue::Configuration.new( suite_max_duration: 120_000, suite_buffer_percent: 10, timing_fallback_duration: 100.0 ) + @strategy = CI::Queue::Strategy::SuiteBinPacking.new(@config) end def test_groups_tests_by_suite @@ -19,7 +21,7 @@ def test_groups_tests_by_suite 'OrderTest#test_1' ]) - chunks = @strategy.order_tests(tests, config: @config) + chunks = @strategy.order_tests(tests) suite_names = chunks.map(&:suite_name).uniq assert_includes suite_names, 'UserTest' @@ -115,7 +117,7 @@ def test_orders_chunks_by_duration_descending end def test_handles_empty_test_list - chunks = @strategy.order_tests([], config: @config) + chunks = @strategy.order_tests([]) assert_equal [], chunks end @@ -123,7 +125,7 @@ def test_handles_missing_timing_file tests = create_mock_tests(['TestA#test_1']) @config.timing_file = '/nonexistent/file.json' - chunks = @strategy.order_tests(tests, config: @config) + chunks = @strategy.order_tests(tests) # Should use fallback duration assert_equal 1, chunks.size @@ -138,7 +140,7 @@ def test_handles_malformed_timing_file file.close @config.timing_file = file.path - chunks = @strategy.order_tests(tests, config: @config) + chunks = @strategy.order_tests(tests) # Should use fallback duration assert_equal 1, chunks.size @@ -148,8 +150,8 @@ def test_handles_malformed_timing_file def test_chunk_ids_are_deterministic tests = create_mock_tests(['TestSuite#test_1']) - chunks1 = @strategy.order_tests(tests, config: @config) - chunks2 = @strategy.order_tests(tests, config: @config) + chunks1 = @strategy.order_tests(tests) + chunks2 = @strategy.order_tests(tests) assert_equal chunks1.first.id, chunks2.first.id end @@ -177,7 +179,7 @@ def test_split_suite_chunk_ids_include_index def test_full_suite_chunk_id_format tests = create_mock_tests(['SmallTest#test_1']) - chunks = @strategy.order_tests(tests, config: @config) + chunks = @strategy.order_tests(tests) chunk = chunks.find { |c| c.suite_name == 'SmallTest' } assert_equal 'SmallTest:full_suite', chunk.id @@ -197,7 +199,9 @@ def order_with_timing(tests, timing_data) file.close @config.timing_file = file.path - @strategy.order_tests(tests, config: @config) + # Recreate strategy to load the new timing data + strategy = CI::Queue::Strategy::SuiteBinPacking.new(@config) + strategy.order_tests(tests) end end From 6b085d7450a1f5006099a2957a525a0d8dc88711 Mon Sep 17 00:00:00 2001 From: Ernesto Barajas Date: Wed, 29 Oct 2025 13:03:56 -0500 Subject: [PATCH 2/4] fix estimated duration print --- ruby/lib/minitest/queue.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index 67ebd0cb..9b0770c1 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -243,7 +243,7 @@ def run_chunk(chunk, reporter) chunk_failed = false failed_tests = [] - puts "Running chunk: #{chunk.suite_name} (#{chunk.size} tests) :: Estimated Duration: (#{chunk.estimated_duration}s)" if ENV['VERBOSE'] + puts "Running chunk: #{chunk.suite_name} (#{chunk.size} tests) :: Estimated Duration: (#{chunk.estimated_duration / 1000}s)" if ENV['VERBOSE'] # Run each test in the chunk sequentially chunk.tests.each do |test| From 881822030182aa1c4b2085fcae991d8d5e89472f Mon Sep 17 00:00:00 2001 From: Ernesto Barajas Date: Thu, 30 Oct 2025 15:37:28 -0500 Subject: [PATCH 3/4] remove require mintest/focus --- ruby/test/ci/queue/strategy/suite_bin_packing_test.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb b/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb index b5ae7993..880cf834 100644 --- a/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb +++ b/ruby/test/ci/queue/strategy/suite_bin_packing_test.rb @@ -2,8 +2,6 @@ require 'test_helper' require 'tempfile' -require 'minitest/focus' - class SuiteBinPackingTest < Minitest::Test def setup @config = CI::Queue::Configuration.new( From 9842be05df1e2b805816b5bba8f1d5aef8ebe9e4 Mon Sep 17 00:00:00 2001 From: Ernesto Barajas Date: Tue, 4 Nov 2025 09:15:23 -0800 Subject: [PATCH 4/4] break apart updating and fetching the hash into separate classes --- ruby/lib/ci/queue/redis.rb | 3 +- .../redis/test_duration_moving_averages.rb | 42 +++++ ruby/lib/ci/queue/redis/test_time_record.rb | 2 +- ...=> update_test_duration_moving_average.rb} | 33 +--- .../ci/queue/strategy/suite_bin_packing.rb | 2 +- .../ci/queue/redis/moving_average_test.rb | 147 ++++++++++-------- ruby/test/ci/queue/redis_test.rb | 32 ++-- 7 files changed, 142 insertions(+), 119 deletions(-) create mode 100644 ruby/lib/ci/queue/redis/test_duration_moving_averages.rb rename ruby/lib/ci/queue/redis/{moving_average.rb => update_test_duration_moving_average.rb} (53%) diff --git a/ruby/lib/ci/queue/redis.rb b/ruby/lib/ci/queue/redis.rb index 3642a117..c3876efe 100644 --- a/ruby/lib/ci/queue/redis.rb +++ b/ruby/lib/ci/queue/redis.rb @@ -10,7 +10,8 @@ require 'ci/queue/redis/supervisor' require 'ci/queue/redis/grind_supervisor' require 'ci/queue/redis/test_time_record' -require 'ci/queue/redis/moving_average' +require 'ci/queue/redis/test_duration_moving_averages' +require 'ci/queue/redis/update_test_duration_moving_average' module CI module Queue diff --git a/ruby/lib/ci/queue/redis/test_duration_moving_averages.rb b/ruby/lib/ci/queue/redis/test_duration_moving_averages.rb new file mode 100644 index 00000000..86cc4bc6 --- /dev/null +++ b/ruby/lib/ci/queue/redis/test_duration_moving_averages.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module CI + module Queue + module Redis + class TestDurationMovingAverages + def initialize(redis, key: "test_duration_moving_averages") + @redis = redis + @key = key + @loaded = false + end + + def loaded? + @loaded + end + + def [](test_id) + load_all unless loaded? + @values[test_id] + end + + def load_all + batch_size = 1000 + cursor = '0' + @values = {} + + loop do + cursor, batch = @redis.hscan(@key, cursor, count: batch_size) + batch.each do |test_id, value| + @values[test_id] = value.to_f + end + break if cursor == '0' + end + end + + def size + @redis.hlen(@key) + end + end + end + end +end diff --git a/ruby/lib/ci/queue/redis/test_time_record.rb b/ruby/lib/ci/queue/redis/test_time_record.rb index a45dd2b7..a78d4ea4 100644 --- a/ruby/lib/ci/queue/redis/test_time_record.rb +++ b/ruby/lib/ci/queue/redis/test_time_record.rb @@ -32,7 +32,7 @@ def record_test_time(test_name, duration) end def record_test_duration_moving_average(test_name, duration) - MovingAverage.new(redis).update(test_name, duration) + UpdateTestDurationMovingAverage.new(redis).update(test_name, duration) end def record_test_name(test_name) diff --git a/ruby/lib/ci/queue/redis/moving_average.rb b/ruby/lib/ci/queue/redis/update_test_duration_moving_average.rb similarity index 53% rename from ruby/lib/ci/queue/redis/moving_average.rb rename to ruby/lib/ci/queue/redis/update_test_duration_moving_average.rb index 501bb8bb..993d7fcd 100644 --- a/ruby/lib/ci/queue/redis/moving_average.rb +++ b/ruby/lib/ci/queue/redis/update_test_duration_moving_average.rb @@ -3,10 +3,7 @@ module CI module Queue module Redis - # Represents a redis hash of moving averages for test durations - # - # Moving average is calculated using exponential moving average formula - class MovingAverage + class UpdateTestDurationMovingAverage LUA_SCRIPT= <<~LUA local hash_key = KEYS[1] local test_id = ARGV[1] @@ -28,36 +25,10 @@ def initialize(redis, key: "test_duration_moving_averages", smoothing_factor: 0. @redis = redis @key = key @smoothing_factor = smoothing_factor - @values = {} - end - - def [](test_id) - load_all if @values.empty? - @values[test_id] end def update(test_id, duration) - new_avg = @redis.eval(LUA_SCRIPT, keys: [@key], argv: [test_id, duration, @smoothing_factor]) - @values[test_id] = new_avg.to_f - new_avg.to_f - end - - def load_all - batch_size = 1000 - cursor = '0' - @values = {} - - loop do - cursor, batch = @redis.hscan(@key, cursor, count: batch_size) - batch.each do |test_id, value| - @values[test_id] = value.to_f - end - break if cursor == '0' - end - end - - def size - @redis.hlen(@key) + @redis.eval(LUA_SCRIPT, keys: [@key], argv: [test_id, duration, @smoothing_factor]).to_f end end end diff --git a/ruby/lib/ci/queue/strategy/suite_bin_packing.rb b/ruby/lib/ci/queue/strategy/suite_bin_packing.rb index ff16788d..edd4c2be 100644 --- a/ruby/lib/ci/queue/strategy/suite_bin_packing.rb +++ b/ruby/lib/ci/queue/strategy/suite_bin_packing.rb @@ -21,7 +21,7 @@ def initialize(config, redis: nil) super(config) if redis - @moving_average = CI::Queue::Redis::MovingAverage.new(redis) + @moving_average = CI::Queue::Redis::TestDurationMovingAverages.new(redis) end if config&.timing_file diff --git a/ruby/test/ci/queue/redis/moving_average_test.rb b/ruby/test/ci/queue/redis/moving_average_test.rb index d2f1ef5b..6bdbb1df 100644 --- a/ruby/test/ci/queue/redis/moving_average_test.rb +++ b/ruby/test/ci/queue/redis/moving_average_test.rb @@ -15,120 +15,128 @@ def teardown end def test_initialize - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - assert_equal 0, ma.size + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) + assert_equal 0, reader.size end def test_initialize_with_custom_smoothing_factor - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.5) - ma.update('test1', 100.0) - first_avg = ma['test1'] + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key, smoothing_factor: 0.5) + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) + + updater.update('test1', 100.0) + first_avg = reader['test1'] assert_equal 100.0, first_avg - ma.update('test1', 200.0) - second_avg = ma['test1'] + updater.update('test1', 200.0) + second_avg = reader['test1'] assert_in_delta 150.0, second_avg, 0.001 end def test_update_creates_new_entry - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - result = ma.update('test1', 10.5) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) + + result = updater.update('test1', 10.5) assert_equal 10.5, result - assert_equal 1, ma.size + assert_equal 1, reader.size end def test_update_calculates_exponential_moving_average - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) - first_avg = ma.update('test1', 100.0) + first_avg = updater.update('test1', 100.0) assert_equal 100.0, first_avg # EMA = 0.2 * 150 + 0.8 * 100 = 30 + 80 = 110 - second_avg = ma.update('test1', 150.0) + second_avg = updater.update('test1', 150.0) assert_in_delta 110.0, second_avg, 0.001 - third_avg = ma.update('test1', 200.0) + third_avg = updater.update('test1', 200.0) assert_in_delta 128.0, third_avg, 0.001 end def test_update_multiple_tests - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) - ma.update('test1', 10.0) - ma.update('test2', 20.0) - ma.update('test3', 30.0) + updater.update('test1', 10.0) + updater.update('test2', 20.0) + updater.update('test3', 30.0) - assert_equal 3, ma.size + assert_equal 3, reader.size end def test_bracket_operator_loads_and_returns_value - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - ma.update('test1', 10.5) - ma.update('test2', 20.5) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) + updater.update('test1', 10.5) + updater.update('test2', 20.5) - ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - assert_equal 10.5, ma2['test1'] - assert_equal 20.5, ma2['test2'] + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) + assert_equal 10.5, reader['test1'] + assert_equal 20.5, reader['test2'] end def test_bracket_operator_returns_nil_for_missing_key - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - ma.update('test1', 10.5) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) + + updater.update('test1', 10.5) - assert_nil ma['nonexistent'] + assert_nil reader['nonexistent'] end def test_load_all_loads_all_values_from_redis - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - ma.update('test1', 10.0) - ma.update('test2', 20.0) - ma.update('test3', 30.0) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) + updater.update('test1', 10.0) + updater.update('test2', 20.0) + updater.update('test3', 30.0) - ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - ma2.load_all + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) + reader.load_all - assert_equal 10.0, ma2['test1'] - assert_equal 20.0, ma2['test2'] - assert_equal 30.0, ma2['test3'] + assert_equal 10.0, reader['test1'] + assert_equal 20.0, reader['test2'] + assert_equal 30.0, reader['test3'] end def test_load_all_handles_batches - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) 1500.times do |i| - ma.update("test_#{i}", i.to_f) + updater.update("test_#{i}", i.to_f) end # Create new instance and load all - ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - ma2.load_all + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) + reader.load_all - assert_equal 1500, ma2.size - assert_equal 0.0, ma2['test_0'] - assert_equal 500.0, ma2['test_500'] - assert_equal 1499.0, ma2['test_1499'] + assert_equal 1500, reader.size + assert_equal 0.0, reader['test_0'] + assert_equal 500.0, reader['test_500'] + assert_equal 1499.0, reader['test_1499'] end def test_size_returns_correct_count - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) - assert_equal 0, ma.size + assert_equal 0, reader.size - ma.update('test1', 10.0) - assert_equal 1, ma.size + updater.update('test1', 10.0) + assert_equal 1, reader.size - ma.update('test2', 20.0) - assert_equal 2, ma.size + updater.update('test2', 20.0) + assert_equal 2, reader.size - ma.update('test1', 15.0) - assert_equal 2, ma.size + updater.update('test1', 15.0) + assert_equal 2, reader.size end def test_updates_persist_to_redis - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) - ma.update('test1', 10.5) - ma.update('test2', 20.5) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) + updater.update('test1', 10.5) + updater.update('test2', 20.5) # Verify data is actually in Redis values = @redis.hgetall(@key) @@ -138,29 +146,30 @@ def test_updates_persist_to_redis end def test_concurrent_updates_from_different_instances - ma1 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) - ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) + updater1 = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) + updater2 = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key, smoothing_factor: 0.2) + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) - ma1.update('test1', 100.0) - ma2.update('test1', 200.0) + updater1.update('test1', 100.0) + updater2.update('test1', 200.0) # Expected: 0.2 * 200 + 0.8 * 100 = 120 - assert_in_delta 120.0, ma2['test1'], 0.001 + assert_in_delta 120.0, reader['test1'], 0.001 end def test_handles_floating_point_precision - ma = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key) # Test with various floating point values - ma.update('test1', 0.123456789) - ma.update('test2', 999.999999) - ma.update('test3', 0.000001) + updater.update('test1', 0.123456789) + updater.update('test2', 999.999999) + updater.update('test3', 0.000001) # Load in new instance - ma2 = CI::Queue::Redis::MovingAverage.new(@redis, key: @key) + reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key) - assert_in_delta 0.123456789, ma2['test1'], 0.000001 - assert_in_delta 999.999999, ma2['test2'], 0.000001 - assert_in_delta 0.000001, ma2['test3'], 0.000001 + assert_in_delta 0.123456789, reader['test1'], 0.000001 + assert_in_delta 999.999999, reader['test2'], 0.000001 + assert_in_delta 0.000001, reader['test3'], 0.000001 end end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index c3b4cf2c..a833c1b7 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -435,9 +435,9 @@ def test_individual_test_uses_default_timeout_after_requeue def test_suite_bin_packing_uses_moving_average_for_duration @redis.flushdb - moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) - moving_avg.update('TestSuite#test_1', 5000.0) - moving_avg.update('TestSuite#test_2', 3000.0) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) + updater.update('TestSuite#test_1', 5000.0) + updater.update('TestSuite#test_2', 3000.0) tests = [ MockTest.new('TestSuite#test_1'), @@ -466,8 +466,8 @@ def test_moving_average_takes_precedence_over_timing_file timing_data = { 'TestSuite#test_1' => 10_000.0 } - moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) - moving_avg.update('TestSuite#test_1', 2000.0) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) + updater.update('TestSuite#test_1', 2000.0) tests = [MockTest.new('TestSuite#test_1')] @@ -538,9 +538,9 @@ def test_mixed_duration_sources_in_suite_splitting @redis.flushdb require 'tempfile' - moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) - moving_avg.update('MixedTest#test_1', 60_000.0) - moving_avg.update('MixedTest#test_2', 50_000.0) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) + updater.update('MixedTest#test_1', 60_000.0) + updater.update('MixedTest#test_2', 50_000.0) timing_data = { 'MixedTest#test_3' => 40_000.0, @@ -581,10 +581,10 @@ def test_mixed_duration_sources_in_suite_splitting def test_moving_average_ordering_by_duration @redis.flushdb - moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) - moving_avg.update('FastTest#test_1', 1000.0) - moving_avg.update('SlowTest#test_1', 10_000.0) - moving_avg.update('MediumTest#test_1', 5000.0) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) + updater.update('FastTest#test_1', 1000.0) + updater.update('SlowTest#test_1', 10_000.0) + updater.update('MediumTest#test_1', 5000.0) tests = [ MockTest.new('FastTest#test_1'), @@ -615,8 +615,8 @@ def test_moving_average_with_partial_coverage @redis.flushdb # Only one test has moving average data - moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) - moving_avg.update('PartialTest#test_1', 3000.0) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) + updater.update('PartialTest#test_1', 3000.0) tests = [ MockTest.new('PartialTest#test_1'), @@ -641,8 +641,8 @@ def test_moving_average_updates_persist_across_workers @redis.flushdb # Manually update moving average as if a previous worker completed the test - moving_avg = CI::Queue::Redis::MovingAverage.new(@redis) - moving_avg.update('PersistTest#test_1', 5500.0) + updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis) + updater.update('PersistTest#test_1', 5500.0) # New worker should see the persisted moving average tests = [MockTest.new('PersistTest#test_1')]