Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions redis/push_queue.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
local master_status_key = KEYS[1]
local queue_key = KEYS[2]
local total_key = KEYS[3]
local generation_key = KEYS[4]

local expected_lock = ARGV[1]
local generation_uuid = ARGV[2]
local total_count = ARGV[3]
local redis_ttl = tonumber(ARGV[4])

-- CAS: verify we still own the lock
if redis.call('get', master_status_key) ~= expected_lock then
return 0
end

-- Push test IDs to queue (ARGV[5] onwards)
for i = 5, #ARGV do
redis.call('lpush', queue_key, ARGV[i])
end

-- Set metadata
redis.call('set', total_key, total_count)
redis.call('set', generation_key, generation_uuid)
redis.call('set', master_status_key, 'ready')

-- Apply TTLs
redis.call('expire', queue_key, redis_ttl)
redis.call('expire', total_key, redis_ttl)
redis.call('expire', generation_key, redis_ttl)
redis.call('expire', master_status_key, redis_ttl)

return 1
10 changes: 10 additions & 0 deletions redis/renew_master_lock.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local master_status_key = KEYS[1]
local expected_value = ARGV[1]
local ttl = tonumber(ARGV[2])

if redis.call('get', master_status_key) == expected_value then
redis.call('expire', master_status_key, ttl)
return 1
else
return 0
end
7 changes: 6 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class Configuration
attr_accessor :timing_redis_url
attr_accessor :write_duration_averages
attr_accessor :heartbeat_grace_period, :heartbeat_interval
attr_accessor :master_lock_ttl, :max_election_attempts
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -66,7 +67,9 @@ def initialize(
branch: nil,
timing_redis_url: nil,
heartbeat_grace_period: 30,
heartbeat_interval: 10
heartbeat_interval: 10,
master_lock_ttl: 30,
max_election_attempts: 3
)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
Expand Down Expand Up @@ -105,6 +108,8 @@ def initialize(
@write_duration_averages = false
@heartbeat_grace_period = heartbeat_grace_period
@heartbeat_interval = heartbeat_interval
@master_lock_ttl = master_lock_ttl
@max_election_attempts = max_election_attempts
end

def queue_init_timeout
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/ci/queue/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Queue
module Redis
Error = Class.new(StandardError)
LostMaster = Class.new(Error)
MasterDied = Class.new(Error)

class << self

Expand Down
55 changes: 47 additions & 8 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ def created_at=(timestamp)

def size
redis.multi do |transaction|
transaction.llen(key('queue'))
transaction.zcard(key('running'))
transaction.llen(generation_key('queue'))
transaction.zcard(generation_key('running'))
end.inject(:+)
end

def to_a
redis.multi do |transaction|
transaction.lrange(key('queue'), 0, -1)
transaction.zrange(key('running'), 0, -1)
transaction.lrange(generation_key('queue'), 0, -1)
transaction.zrange(generation_key('running'), 0, -1)
end.flatten.reverse.map { |k| index.fetch(k) }
end

Expand All @@ -56,11 +56,25 @@ def progress
def wait_for_master(timeout: 120)
return true if master?

(timeout * 10 + 1).to_i.times do
return true if queue_initialized?
deadline = CI::Queue.time_now + timeout
last_status = nil

while CI::Queue.time_now < deadline
status = master_status

if status == 'ready' || status == 'finished'
learn_generation unless master?
return true
end

if status.nil? && last_status == 'setup'
raise MasterDied, "Master lock expired during setup"
end

last_status = status
sleep 0.1
end

raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting."
end

Expand All @@ -71,7 +85,14 @@ def workers_count
def queue_initialized?
@queue_initialized ||= begin
status = master_status
%w[ready finished].include?(status)
if %w[ready finished].include?(status)
learn_generation unless current_generation
true
else
false
end
rescue MasterDied
false
end
end

Expand Down Expand Up @@ -105,12 +126,30 @@ def key(*args)
['build', build_id, *args].join(':')
end

def generation_key(*args)
gen = @generation || @current_generation
return key(*args) unless gen
key('gen', gen, *args)
end

def learn_generation
@current_generation = redis.get(key('current-generation'))
raise MasterDied, "No generation available" unless @current_generation
@current_generation
end

def current_generation
@generation || @current_generation
end

def build_id
config.build_id
end

def master_status
redis.get(key('master-status'))
status = redis.get(key('master-status'))
return 'setup' if status&.start_with?('setup:')
status
end

def eval_script(script, *args)
Expand Down
8 changes: 7 additions & 1 deletion ruby/lib/ci/queue/redis/build_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def passing_tests

TOTAL_KEY = "___total___"
def requeued_tests
requeues = redis.hgetall(key('requeues-count'))
requeues = redis.hgetall(generation_key('requeues-count'))
requeues.delete(TOTAL_KEY)
requeues
end
Expand Down Expand Up @@ -123,6 +123,12 @@ def record_stats(stats, pipeline: redis)
end
end

def generation_key(*args)
gen = @queue.respond_to?(:current_generation) ? @queue.current_generation : nil
return key(*args) unless gen
key('gen', gen, *args)
end

def key(*args)
['build', config.build_id, *args].join(':')
end
Expand Down
4 changes: 2 additions & 2 deletions ruby/lib/ci/queue/redis/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def master?

def total
wait_for_master(timeout: config.queue_init_timeout)
redis.get(key('total')).to_i
redis.get(generation_key('total')).to_i
end

def build
Expand Down Expand Up @@ -53,7 +53,7 @@ def wait_for_workers

def active_workers?
# if there are running jobs we assume there are still agents active
redis.zrangebyscore(key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0
redis.zrangebyscore(generation_key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0
end
end
end
Expand Down
Loading
Loading