Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 0 additions & 22 deletions ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ def requeueable?(test_result)
requeueable.nil? || requeueable.call(test_result)
end

def shuffle(tests, random, config: nil)
if shuffler
shuffler.call(tests, random)
else
strategy = get_strategy(config&.strategy)
strategy.order_tests(tests, random: random, config: config)
end
end

def from_uri(url, config)
uri = URI(url)
implementation = case uri.scheme
Expand All @@ -66,18 +57,5 @@ def from_uri(url, config)
end
implementation.from_uri(uri, config)
end

private

def get_strategy(strategy_name)
case strategy_name&.to_sym
when :timing_based
Strategy::TimingBased.new
when :suite_bin_packing
Strategy::SuiteBinPacking.new
else
Strategy::Random.new
end
end
end
end
22 changes: 22 additions & 0 deletions ruby/lib/ci/queue/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,28 @@ def rescue_connection_errors(handler = ->(err) { nil })
rescue *self::class::CONNECTION_ERRORS => err
handler.call(err)
end

def ordering_strategy
case config.strategy.to_sym
when :timing_based
Strategy::TimingBased.new(config)
when :suite_bin_packing
# pass redis if available
# need to think about a better way to structure queue/strategy interaction
redis_instance = if self.respond_to?(:redis, true) # include private methods
self.send(:redis)
else
nil
end
Strategy::SuiteBinPacking.new(config, redis: redis_instance)
else
Strategy::Random.new(config)
end
end

def reorder_tests(tests, random: Random.new)
ordering_strategy.order_tests(tests, random: random)
end
end
end
end
2 changes: 2 additions & 0 deletions ruby/lib/ci/queue/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
require 'ci/queue/redis/supervisor'
require 'ci/queue/redis/grind_supervisor'
require 'ci/queue/redis/test_time_record'
require 'ci/queue/redis/test_duration_moving_averages'
require 'ci/queue/redis/update_test_duration_moving_average'

module CI
module Queue
Expand Down
42 changes: 42 additions & 0 deletions ruby/lib/ci/queue/redis/test_duration_moving_averages.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module CI
module Queue
module Redis
class TestDurationMovingAverages
def initialize(redis, key: "test_duration_moving_averages")
@redis = redis
@key = key
@loaded = false
end

def loaded?
@loaded
end

def [](test_id)
load_all unless loaded?
@values[test_id]
end

def load_all
batch_size = 1000
cursor = '0'
@values = {}

loop do
cursor, batch = @redis.hscan(@key, cursor, count: batch_size)
batch.each do |test_id, value|
@values[test_id] = value.to_f
end
break if cursor == '0'
end
end

def size
@redis.hlen(@key)
end
end
end
end
end
6 changes: 6 additions & 0 deletions ruby/lib/ci/queue/redis/test_time_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ module Redis
class TestTimeRecord < Worker
def record(test_name, duration)
record_test_time(test_name, duration)
record_test_duration_moving_average(test_name, duration)
record_test_name(test_name)

end

def fetch
Expand All @@ -29,6 +31,10 @@ def record_test_time(test_name, duration)
nil
end

def record_test_duration_moving_average(test_name, duration)
UpdateTestDurationMovingAverage.new(redis).update(test_name, duration)
end

def record_test_name(test_name)
redis.pipelined do |pipeline|
pipeline.lpush(
Expand Down
36 changes: 36 additions & 0 deletions ruby/lib/ci/queue/redis/update_test_duration_moving_average.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

module CI
module Queue
module Redis
class UpdateTestDurationMovingAverage
LUA_SCRIPT= <<~LUA
local hash_key = KEYS[1]
local test_id = ARGV[1]
local new_duration = tonumber(ARGV[2])
local smoothing = tonumber(ARGV[3])
local current_avg = redis.call('HGET', hash_key, test_id)
if current_avg then
current_avg = tonumber(current_avg)
local new_avg = smoothing * new_duration + (1 - smoothing) * current_avg
redis.call('HSET', hash_key, test_id, new_avg)
return tostring(new_avg)
else
redis.call('HSET', hash_key, test_id, new_duration)
return tostring(new_duration)
end
LUA

def initialize(redis, key: "test_duration_moving_averages", smoothing_factor: 0.2)
@redis = redis
@key = key
@smoothing_factor = smoothing_factor
end

def update(test_id, duration)
@redis.eval(LUA_SCRIPT, keys: [@key], argv: [test_id, duration, @smoothing_factor]).to_f
end
end
end
end
end
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def distributed?

def populate(tests, random: Random.new)
@index = tests.map { |t| [t.id, t] }.to_h
executables = Queue.shuffle(tests, random, config: config)
executables = reorder_tests(tests, random: random)

# Separate chunks from individual tests
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
Expand Down
10 changes: 8 additions & 2 deletions ruby/lib/ci/queue/strategy/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ module CI
module Queue
module Strategy
class Base
def order_tests(tests, random: Random.new, config: nil)
def initialize(config)
@config = config
end

attr_reader :config

def order_tests(tests)
raise NotImplementedError, "#{self.class} must implement #order_tests"
end
end
end
end
end
end
4 changes: 2 additions & 2 deletions ruby/lib/ci/queue/strategy/random.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ module CI
module Queue
module Strategy
class Random < Base
def order_tests(tests, random: Random.new, config: nil)
def order_tests(tests, random: Random.new)
tests.sort.shuffle(random: random)
end
end
end
end
end
end
76 changes: 46 additions & 30 deletions ruby/lib/ci/queue/strategy/suite_bin_packing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,36 @@ module CI
module Queue
module Strategy
class SuiteBinPacking < Base
def order_tests(tests, random: Random.new, config: nil)
timing_data = load_timing_data(config&.timing_file)
max_duration = config&.suite_max_duration || 120_000
fallback_duration = config&.timing_fallback_duration || 100.0
buffer_percent = config&.suite_buffer_percent || 10
class << self
def load_timing_data(file_path)
return {} unless file_path && ::File.exist?(file_path)

JSON.parse(::File.read(file_path))
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
{}
end
end

def initialize(config, redis: nil)
super(config)

if redis
@moving_average = CI::Queue::Redis::TestDurationMovingAverages.new(redis)
end

if config&.timing_file
@timing_data = self.class.load_timing_data(config.timing_file)
else
@timing_data = {}
end

@max_duration = config&.suite_max_duration || 120_000
@fallback_duration = config&.timing_fallback_duration || 100.0
@buffer_percent = config&.suite_buffer_percent || 10
end

def order_tests(tests, random: ::Random.new, redis: nil)
# Group tests by suite name
suites = tests.group_by { |test| extract_suite_name(test.id) }

Expand All @@ -22,10 +46,6 @@ def order_tests(tests, random: Random.new, config: nil)
create_chunks_for_suite(
suite_name,
suite_tests,
max_duration,
buffer_percent,
timing_data,
fallback_duration
)
)
end
Expand All @@ -40,27 +60,27 @@ def extract_suite_name(test_id)
test_id.split('#').first
end

def load_timing_data(file_path)
return {} unless file_path && ::File.exist?(file_path)

JSON.parse(::File.read(file_path))
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
{}
end
def get_test_duration(test_id)
if @moving_average
avg = @moving_average[test_id]
return avg if avg
end

def get_test_duration(test_id, timing_data, fallback_duration)
timing_data[test_id]&.to_f || fallback_duration
if @timing_data.key?(test_id)
@timing_data[test_id]
else
@fallback_duration
end
end

def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
def create_chunks_for_suite(suite_name, suite_tests)
# Calculate total suite duration
total_duration = suite_tests.sum do |test|
get_test_duration(test.id, timing_data, fallback_duration)
get_test_duration(test.id)
end

# If suite fits in max duration, create full_suite chunk
if total_duration <= max_duration
if total_duration <= @max_duration
chunk_id = "#{suite_name}:full_suite"
# Don't store test_ids in Redis - worker will resolve from index
# But pass test_count for timeout calculation
Expand All @@ -71,20 +91,16 @@ def create_chunks_for_suite(suite_name, suite_tests, max_duration, buffer_percen
split_suite_into_chunks(
suite_name,
suite_tests,
max_duration,
buffer_percent,
timing_data,
fallback_duration
)
end

def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percent, timing_data, fallback_duration)
def split_suite_into_chunks(suite_name, suite_tests)
# Apply buffer to max duration
effective_max = max_duration * (1 - buffer_percent / 100.0)
effective_max = @max_duration * (1 - @buffer_percent / 100.0)

# Sort tests by duration (longest first for better bin packing)
sorted_tests = suite_tests.sort_by do |test|
-get_test_duration(test.id, timing_data, fallback_duration)
-get_test_duration(test.id)
end

# First-fit decreasing bin packing
Expand All @@ -94,7 +110,7 @@ def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percen
chunk_index = 0

sorted_tests.each do |test|
test_duration = get_test_duration(test.id, timing_data, fallback_duration)
test_duration = get_test_duration(test.id)

if current_chunk_duration + test_duration > effective_max && current_chunk_tests.any?
# Finalize current chunk and start new one
Expand Down
6 changes: 3 additions & 3 deletions ruby/lib/ci/queue/strategy/timing_based.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module CI
module Queue
module Strategy
class TimingBased < Base
def order_tests(tests, random: Random.new, config: nil)
def order_tests(tests, random: Random.new)
timing_data = load_timing_data(config&.timing_file)
fallback_duration = config&.timing_fallback_duration || 100.0

Expand All @@ -20,7 +20,7 @@ def order_tests(tests, random: Random.new, config: nil)

def load_timing_data(file_path)
return {} unless file_path && ::File.exist?(file_path)

JSON.parse(::File.read(file_path))
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
Expand All @@ -32,4 +32,4 @@ def load_timing_data(file_path)
end
end
end
end
end
2 changes: 1 addition & 1 deletion ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def run_chunk(chunk, reporter)
chunk_failed = false
failed_tests = []

puts "Running chunk: #{chunk.suite_name} (#{chunk.size} tests) :: Estimated Duration: (#{chunk.estimated_duration}s)" if ENV['VERBOSE']
puts "Running chunk: #{chunk.suite_name} (#{chunk.size} tests) :: Estimated Duration: (#{chunk.estimated_duration / 1000}s)" if ENV['VERBOSE']

# Run each test in the chunk sequentially
chunk.tests.each do |test|
Expand Down
Loading