Skip to content

Commit

Permalink
Added configuration options for min and max priority, added status se…
Browse files Browse the repository at this point in the history
…rver port to the global config.
  • Loading branch information
Roy committed Dec 16, 2023
1 parent b334272 commit 0a44507
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 135 deletions.
182 changes: 110 additions & 72 deletions README.md

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions lib/exekutor/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,26 @@ def set_db_connection_name?
# @!method $1?
# The rack handler for the status server
# === Default value:
# webrick
# +"webrick"+
# @return [String]
# @!method $1=(value)
# Sets the rack handler for the status server. The handler should respond to +#shutdown+ or +#stop+.
# @param value [String] the name of the handler
# @return [self]
define_option :status_server_handler, default: "webrick", type: String

# @!macro
# @!method $1?
# The port number for the status server
# === Default value:
# +nil+ (ie. the status server is disabled)
# @return [Integer]
# @!method $1=(value)
# Sets the port number for the status server.
# @param value [Integer] the port number
# @return [self]
define_option :status_server_port, default: nil, type: Integer

# @!macro
# @!method $1?
# The heartbeat timeout for the `/live` endpoint of the status server. If the heartbeat of a worker
Expand Down Expand Up @@ -303,7 +315,8 @@ def worker_options
%i[enable_listener delete_completed_jobs delete_discarded_jobs delete_failed_jobs].each do |option|
opts[option] = send(:"#{option}?") ? true : false
end
%i[polling_interval polling_jitter status_server_handler healthcheck_timeout].each do |option|
%i[polling_interval polling_jitter status_server_handler status_server_port healthcheck_timeout]
.each do |option|
opts[option] = send(option)
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/exekutor/internal/cli/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ def self.define_start_options(cmd)
cmd.flag %i[env environment], desc: "The Rails environment"
cmd.flag %i[q queue], default_value: Manager::DEFAULT_QUEUE, multiple: true,
desc: "Queue to work from"
cmd.flag %i[p priority], type: String, default_value: Manager::DEFAULT_PRIORITIES,
desc: "The job priorities to execute, specified as `min` or `min:max`"
cmd.flag %i[t threads], type: String, default_value: Manager::DEFAULT_THREADS,
desc: "The number of threads for executing jobs, specified as `min:max`"
cmd.flag %i[p poll_interval], type: Integer, default_value: DefaultOptionValue.new(value: 60),
cmd.flag %i[i poll_interval], type: Integer, default_value: DefaultOptionValue.new(value: 60),
desc: "Interval between polls for available jobs (in seconds)"
cmd.flag %i[cfg configfile], type: String, default_value: Manager::DEFAULT_CONFIG_FILES, multiple: true,
desc: "The YAML configuration file to load. If specifying multiple files, the last file takes " \
Expand Down
46 changes: 30 additions & 16 deletions lib/exekutor/internal/cli/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def initialize(options)
# @option options [String] :environment The Rails environment to load
# @option options [String] :queue The queue(s) to watch
# @option options [String] :threads The number of threads to use for job execution
# @option options [String] :priority The priorities to execute
# @option options [Integer] :poll_interval The interval in seconds for job polling
# @return [Void]
def start(options)
Expand Down Expand Up @@ -97,16 +98,22 @@ def cli_worker_overrides(cli_options)
.reject { |_, value| value.is_a? DefaultOptionValue }
.transform_keys(poll_interval: :polling_interval)

min_threads, max_threads = parse_thread_limitations(cli_options[:threads])
min_threads, max_threads = parse_integer_range(cli_options[:threads])
if min_threads
worker_options[:min_threads] = min_threads
worker_options[:max_threads] = max_threads || min_threads
end

min_priority, max_priority = parse_integer_range(cli_options[:priority])
if min_threads
worker_options[:min_priority] = min_priority
worker_options[:max_priority] = max_priority if max_priority
end

worker_options
end

def parse_thread_limitations(threads)
def parse_integer_range(threads)
return if threads.blank? || threads.is_a?(DefaultOptionValue)

if threads.is_a?(Integer)
Expand Down Expand Up @@ -237,20 +244,33 @@ def initialize(files, options)
def load_config(worker_options)
each_file do |path|
config = load_config_file(path)
apply_config_file(config, worker_options)
convert_duration_options! config

worker_options.merge! extract_worker_options!(config)
apply_config_file(config)
end
Exekutor.config
end

private

WORKER_OPTIONS = %i[queues min_priority max_priority min_threads max_threads max_thread_idletime
wait_for_termination].freeze

def each_file(&block)
if @config_files.is_a? DefaultConfigFileValue
@config_files.to_a(@options[:identifier]).each(&block)
elsif @config_files
elsif @config_files.is_a? String
yield File.expand_path(@config_files, Rails.root)
else
@config_files.map { |path| File.expand_path(path, Rails.root) }.each(&block)
end
end

def extract_worker_options!(config)
config.extract!(*WORKER_OPTIONS)
end

def load_config_file(path)
puts "Loading config file: #{path}" if @options[:verbose]
config = begin
Expand All @@ -262,20 +282,13 @@ def load_config_file(path)
raise Error, "Config should have an `exekutor` root node: #{path} (Found: #{config.keys.join(", ")})"
end

config
config[:exekutor]
end

def apply_config_file(config, worker_options)
# Remove worker specific options before calling Exekutor.config.set
worker_options.merge! config[:exekutor].extract!(:queue, :status_server_port)

convert_duration_options! config[:exekutor]

begin
Exekutor.config.set(**config[:exekutor])
rescue StandardError => e
raise Error, "Cannot load config file: #{path} (#{e})"
end
def apply_config_file(config)
Exekutor.config.set(**config)
rescue StandardError => e
raise Error, "Cannot load config file (#{e})"
end

def convert_duration_options!(config)
Expand Down Expand Up @@ -346,6 +359,7 @@ def to_a(identifier = nil)
"Minimum: 1, Maximum: Active record pool size minus 1, with a minimum of 1"
).freeze
DEFAULT_QUEUE = DefaultOptionValue.new("All queues").freeze
DEFAULT_PRIORITIES = DefaultOptionValue.new("All priorities").freeze
DEFAULT_FOREVER = DefaultOptionValue.new("Forever").freeze

DEFAULT_CONFIGURATION = { set_db_connection_name: true }.freeze
Expand Down
29 changes: 22 additions & 7 deletions lib/exekutor/internal/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ class Listener
# @param pool [ThreadPoolExecutor] the thread pool to use
# @param wait_timeout [Integer] the time to listen for notifications
# @param set_db_connection_name [Boolean] whether to set the application name on the DB connection
def initialize(worker_id:, provider:, pool:, queues: nil, wait_timeout: 60, set_db_connection_name: false)
def initialize(worker_id:, provider:, pool:, queues: nil, min_priority: nil, max_priority: nil, wait_timeout: 60,
set_db_connection_name: false)
super()
@config = {
worker_id: worker_id,
queues: queues || [],
queues: queues.presence,
min_priority: min_priority,
max_priority: max_priority,
wait_timeout: wait_timeout,
set_db_connection_name: set_db_connection_name
}
Expand Down Expand Up @@ -67,11 +70,22 @@ def provider_channel
PROVIDER_CHANNEL % @config[:worker_id]
end

# Whether this listener is listening to the given queue
# @return [Boolean]
# @return [Boolean] Whether the job matches the configured queues and priority range
def job_filter_match?(job_info)
listening_to_queue?(job_info["q"]) && listening_to_priority?(job_info["p"].to_i)
end

# @return [Boolean] Whether this listener is listening to the given queue
def listening_to_queue?(queue)
queues = @config[:queues]
queues.empty? || queues.include?(queue)
queues.nil? || queues.include?(queue)
end

# @return [Boolean] Whether this listener is listening to the given priority
def listening_to_priority?(priority)
minimum = @config[:min_priority]
maximum = @config[:max_priority]
(minimum.nil? || minimum <= priority) && (maximum.nil? || maximum >= priority)
end

# Starts the listener thread
Expand Down Expand Up @@ -129,8 +143,9 @@ def wait_for_jobs(connection)
JobParser.parse(payload)
rescue StandardError => e
logger.error e.message
nil
end
next unless job_info && listening_to_queue?(job_info["q"])
next unless job_info && job_filter_match?(job_info)

@provider.update_earliest_scheduled_at(job_info["t"].to_f)
end
Expand Down Expand Up @@ -177,7 +192,7 @@ def listening?

# Parses a NOTIFY payload to a job
class JobParser
JOB_INFO_KEYS = %w[id q t].freeze
JOB_INFO_KEYS = %w[id q p t].freeze

def self.parse(payload)
job_info = begin
Expand Down
1 change: 1 addition & 0 deletions lib/exekutor/internal/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def self.backtrace_cleaner
# @return [String] The given error class, message, and cleaned backtrace as a string
def self.strferr(err)
raise ArgumentError, "err must not be nil" if err.nil?
return err if err.is_a? String

"#{err.class}#{err.message}\nat #{
err.backtrace ? backtrace_cleaner.clean(err.backtrace).join("\n ") : "unknown location"
Expand Down
52 changes: 42 additions & 10 deletions lib/exekutor/internal/reserver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ class Reserver
# Creates a new Reserver
# @param worker_id [String] the id of the worker
# @param queues [Array<String>] the queues to watch
def initialize(worker_id, queues)
def initialize(worker_id, queues: nil, min_priority: nil, max_priority: nil)
@worker_id = worker_id
@queue_filter_sql = build_queue_filter_sql(queues)
@reserve_filter_sql = build_filter_sql(queues: queues, min_priority: min_priority, max_priority: max_priority)
@json_serializer = Exekutor.config.load_json_serializer
end

Expand All @@ -26,7 +26,7 @@ def reserve(limit)
results = Exekutor::Job.connection.exec_query <<~SQL, ACTION_NAME, [@worker_id, limit], prepare: true
UPDATE exekutor_jobs SET worker_id = $1, status = 'e' WHERE id IN (
SELECT id FROM exekutor_jobs
WHERE scheduled_at <= now() AND "status"='p' #{@queue_filter_sql}
WHERE scheduled_at <= now() AND "status"='p'#{" AND #{@reserve_filter_sql}" if @reserve_filter_sql}
ORDER BY priority, scheduled_at, enqueued_at
FOR UPDATE SKIP LOCKED
LIMIT $2
Expand All @@ -51,7 +51,7 @@ def get_abandoned_jobs(active_job_ids)
# @return [Time,nil] The earliest scheduled at, or nil if the queues are empty
def earliest_scheduled_at
jobs = Exekutor::Job.pending
jobs.where! @queue_filter_sql.gsub(/^\s*AND\s+/, "") unless @queue_filter_sql.nil?
jobs.where! @reserve_filter_sql unless @reserve_filter_sql.nil?
jobs.minimum(:scheduled_at)
end

Expand All @@ -72,18 +72,49 @@ def parse_json(str)
@json_serializer.load str unless str.nil?
end

# Builds SQL filter for the given queues and priorities
def build_filter_sql(queues:, min_priority:, max_priority:)
filters = [
build_queue_filter_sql(queues),
build_priority_filter_sql(min_priority, max_priority)
]
filters.compact!
filters.join(" AND ") unless filters.empty?
end

# Builds SQL filter for the given queues
def build_queue_filter_sql(queues)
return nil if queues.nil? || (queues.is_a?(Array) && queues.empty?)

queues = queues.first if queues.is_a?(Array) && queues.one?
validate_queues! queues

if queues.is_a? Array
Exekutor::Job.sanitize_sql_for_conditions(["AND queue IN (?)", queues])
else
Exekutor::Job.sanitize_sql_for_conditions(["AND queue = ?", queues])
end
conditions = if queues.is_a? Array
["queue IN (?)", queues]
else
["queue = ?", queues]
end
Exekutor::Job.sanitize_sql_for_conditions conditions
end

# Builds SQL filter for the given priorities
def build_priority_filter_sql(minimum, maximum)
minimum = coerce_priority(minimum)
maximum = coerce_priority(maximum)

conditions = if minimum && maximum
["priority BETWEEN ? AND ?", minimum, maximum]
elsif minimum
["priority >= ?", minimum]
elsif maximum
["priority <= ?", maximum]
end
Exekutor::Job.sanitize_sql_for_conditions conditions if conditions
end

# @return [Integer,nil] returns nil unless +priority+ is between 1 and 32,766
def coerce_priority(priority)
priority if priority && (1..32_766).cover?(priority)
end

# Raises an error if the queues value is invalid
Expand All @@ -96,7 +127,8 @@ def validate_queues!(queues)
when String, Symbol
raise ArgumentError, "queue name cannot be empty" unless valid_queue_name? queues
else
raise ArgumentError, "queues must be nil, a String, Symbol, or an array of Strings or Symbols"
raise ArgumentError,
"queues must be nil, a String, Symbol, or an array of Strings or Symbols (Actual: #{queues.class})"
end
end

Expand Down
11 changes: 6 additions & 5 deletions lib/exekutor/internal/status_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ def stop
return unless @thread_running.value

server = @server.value
if server.respond_to? :shutdown
server.shutdown
elsif server.respond_to? :stop
server.stop
shutdown_method = %i[shutdown stop].find { |method| server.respond_to? method }
if shutdown_method
server.send(shutdown_method)
Exekutor.say "Status server stopped"
elsif server
Exekutor.say! "Cannot shutdown status server, #{server.class.name} does not respond to shutdown or stop"
Exekutor.print_error "Cannot shutdown status server, " \
"#{server.class.name} does not respond to `shutdown` or `stop`"
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/exekutor/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def executor_options(worker_options)
end

def create_provider(worker_options, executor, thread_pool)
@reserver = Internal::Reserver.new @record.id, worker_options[:queues]
@reserver = Internal::Reserver.new @record.id, **worker_options.slice(:queues, :min_priority, :max_priority)
provider = Internal::Provider.new reserver: @reserver, executor: executor, pool: thread_pool,
**provider_options(worker_options)

Expand Down
2 changes: 1 addition & 1 deletion lib/generators/exekutor/configuration_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class ConfigurationGenerator < Rails::Generators::Base
# Creates the configuration file at +config/exekutor.yml+. Uses the current worker configuration as the base.
def create_configuration_file
config = { queues: %w[queues to watch] }.merge(Exekutor.config.worker_options)
config[:status_port] = 8765
config[:status_port] = 12_677
config[:set_db_connection_name] = true
config[:wait_for_termination] = 120

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
CREATE
OR REPLACE FUNCTION exekutor_broadcast_job_enqueued() RETURNS TRIGGER AS $$
BEGIN
PERFORM
PERFORM
pg_notify('exekutor::job_enqueued',
CONCAT('id:', NEW.id,';q:', NEW.queue,';t:', extract ('epoch' from NEW.scheduled_at)));
CONCAT('id:', NEW.id,';q:', NEW.queue,';p:', NEW.priority, ';t:', extract ('epoch' from NEW.scheduled_at)));
RETURN NULL;
END;
$$
Expand Down
1 change: 1 addition & 0 deletions test/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def test_worker_options
polling_interval: config.polling_interval,
polling_jitter: config.polling_jitter,
status_server_handler: config.status_server_handler,
status_server_port: config.status_server_port,
healthcheck_timeout: config.healthcheck_timeout
}

Expand Down

0 comments on commit 0a44507

Please sign in to comment.