Skip to content

Commit

Permalink
Updated some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Roy committed May 14, 2023
1 parent 6afce18 commit 8851537
Show file tree
Hide file tree
Showing 16 changed files with 67 additions and 15 deletions.
2 changes: 1 addition & 1 deletion lib/exekutor/asynchronous.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def self.validate_args(delegate, method, *args, **kwargs)

# The internal job used for {Exekutor::Asynchronous}. Only works for methods that are marked as asynchronous to
# prevent remote code execution. Include the {Exekutor::Asynchronous} and call
# {Exekutor::Asynchronous#perform_asynchronously} to mark a method as asynchronous.
# +perform_asynchronously+ to mark a method as asynchronous.
class AsyncMethodJob < ActiveJob::Base # rubocop:disable Rails/ApplicationJob
# Calls the original, synchronous method
# @!visibility private
Expand Down
6 changes: 4 additions & 2 deletions lib/exekutor/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative "internal/configuration_builder"

# The Exekutor namespace
module Exekutor
# Configuration for the Exekutor library
class Configuration
Expand Down Expand Up @@ -88,7 +89,7 @@ def load_json_serializer
end

serializer = const_get :json_serializer
serializer = SerializerValidator.try_convert! serializer unless SerializerValidator.valid? serializer
serializer = SerializerValidator.convert! serializer unless SerializerValidator.valid? serializer
@json_serializer_instance = [raw_value, serializer]
serializer
end
Expand Down Expand Up @@ -348,10 +349,11 @@ def self.valid?(serializer)
serializer.respond_to?(:dump) && serializer.respond_to?(:load)
end

# Tries to convert the specified value to a serializer, raises an error if the conversion fails.
# @param serializer [Any] the value to convert
# @return [#dump&#load]
# @raise [Error] if the serializer has not implemented dump & load
def self.try_convert!(serializer)
def self.convert!(serializer)
return serializer if SerializerValidator.valid? serializer

if serializer.respond_to?(:call)
Expand Down
9 changes: 5 additions & 4 deletions lib/exekutor/hook.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module Hook
included do
class_attribute :__callbacks, default: Hash.new { |h, k| h[k] = [] }

private_class_method :add_callback!
private_class_method :_add_callback
end

# Gets the registered callbacks
Expand Down Expand Up @@ -141,7 +141,7 @@ def callbacks
CALLBACK_NAMES.each do |name|
module_eval <<-RUBY, __FILE__, __LINE__ + 1
def #{name}(*methods, &callback) # def callback_name(*methods, &callback
add_callback! :#{name}, methods, callback # add_callback! :callback_name, methods, callback
_add_callback :#{name}, methods, callback # _add_callback :callback_name, methods, callback
end # end
RUBY
end
Expand All @@ -155,11 +155,12 @@ def add_callback(type, *methods, &callback)
raise Error, "Invalid callback type: #{type} (Expected one of: #{CALLBACK_NAMES.map(&:inspect).join(", ")}"
end

add_callback! type, methods, callback
_add_callback type, methods, callback
true
end

def add_callback!(type, methods, callback)
# @!visibility private
def _add_callback(type, methods, callback)
raise Error, "Either a method or a callback block must be supplied" if methods.present? && callback.present?

if methods.present?
Expand Down
15 changes: 12 additions & 3 deletions lib/exekutor/internal/cli/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
require_relative "default_option_value"
require_relative "daemon"

# rubocop:disable Style/FormatStringToken
module Exekutor
# @private
module Internal
Expand All @@ -26,7 +25,6 @@ def initialize(options)
# @option options [String] :threads The number of threads to use for job execution
# @option options [Integer] :poll_interval The interval in seconds for job polling
# @return [Void]

def start(options)
Process.setproctitle "Exekutor worker (Initializing…) [#{$PROGRAM_NAME}]"
daemonize(restarting: options[:restart]) if options[:daemonize]
Expand All @@ -43,6 +41,8 @@ def start(options)
end
end

# Stops a daemonized worker
# @return [Void]
def stop(options)
daemon = Daemon.new(pidfile: pidfile)
pid = daemon.pid
Expand All @@ -64,6 +64,8 @@ def stop(options)
puts "Worker (PID: #{pid}) stopped." unless quiet?
end

# Restarts a daemonized worker
# @return [Void]
def restart(stop_options, start_options)
stop stop_options.merge(restart: true)
start start_options.merge(restart: true, daemonize: true)
Expand Down Expand Up @@ -215,6 +217,8 @@ def identifier
@global_options[:identifier]
end

# rubocop:disable Style/FormatStringToken

# @return [String] The path to the pidfile
def pidfile
pidfile = @global_options[:pidfile] || DEFAULT_PIDFILE
Expand Down Expand Up @@ -260,6 +264,8 @@ def initialize
super("tmp/pids/exekutor[.%{identifier}].pid")
end

# @param identifier [nil,String] the worker identifier
# @return [String] the path to the default pidfile of the worker with the specified identifier
def for_identifier(identifier)
if identifier.nil? || identifier.empty? # rubocop:disable Rails/Blank – Rails is not loaded here
"tmp/pids/exekutor.pid"
Expand All @@ -277,6 +283,8 @@ def initialize
DESC
end

# @param identifier [nil,String] the worker identifier
# @return [Array<String>] the paths to the configfiles to load
def to_a(identifier = nil)
files = []
%w[config/exekutor.yml config/exekutor.yaml].each do |path|
Expand All @@ -299,6 +307,8 @@ def to_a(identifier = nil)
end
end

# rubocop:enable Style/FormatStringToken

DEFAULT_PIDFILE = DefaultPidFileValue.new.freeze
DEFAULT_CONFIG_FILES = DefaultConfigFileValue.new.freeze

Expand All @@ -315,4 +325,3 @@ class Error < StandardError; end
end
end
end
# rubocop:enable Style/FormatStringToken
4 changes: 4 additions & 0 deletions lib/exekutor/internal/database_connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ def self.set_application_name(pg_conn, id, process = nil)
end

# The connection name for the specified worker id and process
# @param id [String] the id of the worker
# @param process [nil,String] the process name
def self.application_name(id, process = nil)
"Exekutor[id: #{id}]#{" #{process}" if process}"
end

# Reconnects the database if it is not active
# @param connection [ActiveRecord::ConnectionAdapters::AbstractAdapter] the connection adapter to use
def self.ensure_active!(connection = BaseRecord.connection)
connection.reconnect! unless connection.active?
end
Expand Down
4 changes: 3 additions & 1 deletion lib/exekutor/internal/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Executable
# Possible states
STATES = %i[pending started stopped crashed killed].freeze

# Initializes the internal variables
def initialize
@state = Concurrent::AtomicReference.new(:pending)
@consecutive_errors = Concurrent::AtomicFixnum.new(0)
Expand All @@ -25,11 +26,12 @@ def state
@state.get
end

# Whether the state equals +:started+
# @return [Boolean] whether the state equals +:started+
def running?
@state.get == :started
end

# @return [Concurrent::AtomicFixnum] the number of consecutive errors that have occurred
def consecutive_errors
@consecutive_errors
end
Expand Down
1 change: 1 addition & 0 deletions lib/exekutor/internal/hooks.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

# The Exekutor namespace
module Exekutor
module Internal
# The internal implementation of the Exekutor hooks
Expand Down
2 changes: 1 addition & 1 deletion lib/exekutor/internal/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def self.parse(payload)
# Raised when an error occurs in the listener.
class Error < Exekutor::Error; end

# Raised when the database connection is not an instance of PG::Connection.
# Raised when the database connection is not an instance of +PG::Connection+.
class UnsupportedDatabase < Exekutor::Error; end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/exekutor/internal/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require "rainbow"

# The Exekutor namespace
module Exekutor
# @private
module Internal
Expand Down
2 changes: 1 addition & 1 deletion lib/exekutor/internal/provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def poll
# argument is given. Updates the timestamp for the earliest job is a timestamp is given and that timestamp is
# before the known timestamp. Does nothing if a timestamp is given and the earliest job timestamp is not known.
# @param scheduled_at [Time,Numeric] the time a job is scheduled at
# @return [float,nil] the timestamp for the next job, or +nil+ if the timestamp is unknown or no jobs are pending
# @return [Float,nil] the timestamp for the next job, or +nil+ if the timestamp is unknown or no jobs are pending
def update_earliest_scheduled_at(scheduled_at = UNKNOWN)
scheduled_at = scheduled_at.to_f if scheduled_at.is_a? Time
unless scheduled_at == UNKNOWN || scheduled_at.is_a?(Numeric)
Expand Down
5 changes: 4 additions & 1 deletion lib/exekutor/internal/reserver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(worker_id, queues)

# Reserves pending jobs
# @param limit [Integer] the number of jobs to reserve
# @return [Array<Job>,nil] the reserved jobs, or nil if no jobs were reserved
# @return [Array<Hash>,nil] the reserved jobs, or nil if no jobs were reserved
def reserve(limit)
return unless limit.positive?

Expand All @@ -37,6 +37,9 @@ def reserve(limit)
parse_jobs results
end

# Gets the jobs that are assigned to this worker and have an id that is not included in +active_job_ids+
# @param active_job_ids [Array<String>] The ids of the jobs that should be excluded
# @return [Array<Hash>] the jobs
def get_abandoned_jobs(active_job_ids)
jobs = Exekutor::Job.executing.where(worker_id: @worker_id)
jobs = jobs.where.not(id: active_job_ids) if active_job_ids.present?
Expand Down
20 changes: 19 additions & 1 deletion lib/exekutor/internal/status_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,21 @@

module Exekutor
module Internal
# Serves a simple health check app
# Serves a simple health check app. The app provides 4 endpoints:
# - +/+, which lists the other endpoints;
# - +/ready+, which indicates whether the worker is ready to start work;
# - +/live+, which indicates whether the worker is ready and whether the worker is still alive;
# - +/threads+, which indicated the thread usage of the worker.
#
# Please note that this server uses +webrick+ by default, which is no longer a default gem from ruby 3.0 onwards.
#
# === Example requests
# $ curl localhost:9000/ready
# [OK] ID: f1a2ee6a-cdac-459c-a4b8-de7c6a8bbae6; State: started
# $ curl localhost:9000/live
# [OK] ID: f1a2ee6a-cdac-459c-a4b8-de7c6a8bbae6; State: started; Heartbeat: 2023-04-05T16:27:00Z
# $ curl localhost:9000/threads
# {"minimum":1,"maximum":10,"available":4,"usage_percent":60.0}
class StatusServer
include Internal::Logger
include Internal::Executable
Expand All @@ -20,16 +34,19 @@ def initialize(worker:, pool:, port:, handler: DEFAULT_HANDLER, heartbeat_timeou
@server = Concurrent::AtomicReference.new
end

# Starts the web server
def start
return false unless compare_and_set_state :pending, :started

start_thread
end

# @return [Boolean] whether the web server is active
def running?
super && @thread_running.value
end

# Stops the web server
def stop
self.state = :stopped
return unless @thread_running.value
Expand All @@ -46,6 +63,7 @@ def stop

protected

# Runs the web server, should be called from a separate thread
def run(worker, port)
return unless state == :started && @thread_running.make_true

Expand Down
1 change: 1 addition & 0 deletions lib/exekutor/plugins.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

# The Exekutor namespace
module Exekutor
module Plugins
# Raised when a plugin cannot be loaded
Expand Down
7 changes: 7 additions & 0 deletions lib/exekutor/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,17 @@ def id
@record.id
end

# @return [Time,nil] The timestamp of the last heartbeat. The timestamp is truncated to whole minutes.
def last_heartbeat
@record.last_heartbeat_at
end

# Returns the thread usage for this worker. The resulting hash will contain the following key-value pairs:
# - +:minimum+, (Integer) the minimum number of threads that should be active;
# - +:maximum+, (Integer) the maximum number of threads may should be active;
# - +:available+, (Integer) the number of threads that are available to execute new jobs;
# - +:usage_percent+, (Float, 0-100) the percentage of workers that are currently busy executing jobs.
# @return [Hash] the thread usage
def thread_stats
available = @executor.available_threads
usage_percent = (((100 - (available * 100.0 / @executor.maximum_threads))).round(2) if @executor.running?)
Expand Down
1 change: 1 addition & 0 deletions lib/generators/exekutor/configuration_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class ConfigurationGenerator < Rails::Generators::Base

class_option :identifier, type: :string, aliases: %i[--id], desc: "The worker identifier"

# Creates the configuration file at +config/exekutor.yml+. Uses the current worker configuration as the base.
def create_configuration_file
config = { queues: %w[queues to watch] }.merge(Exekutor.config.worker_options)
config[:status_port] = 8765
Expand Down
2 changes: 2 additions & 0 deletions lib/generators/exekutor/install_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ class InstallGenerator < Rails::Generators::Base
TEMPLATE_DIR = File.join(__dir__, "templates/install")
source_paths << TEMPLATE_DIR

# Creates the initializer file at +config/initializers/exekutor.rb+
def create_initializer_file
template "initializers/exekutor.rb.erb", "config/initializers/exekutor.rb"
end

# Creates the migration file in the migrations folder
def create_migration_file
migration_template "migrations/create_exekutor_schema.rb.erb",
File.join(db_migrate_path, "create_exekutor_schema.rb")
Expand Down

0 comments on commit 8851537

Please sign in to comment.