Skip to content

Commit

Permalink
Add configuration options to good_job executable
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Jul 15, 2020
1 parent 6367588 commit e44d06d
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 15 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ $ bundle install
$ bundle exec good_job
```

Configuration options available with `help`:
```bash
$ bundle exec good_job help start

# Usage:
# good_job start
#
# Options:
# [--max-threads=N] # Maximum number of threads to use for working jobs (default: ActiveRecord::Base.connection_pool.size)
# [--queues=queue1,queue2] # Queues to work from. Separate multiple queues with commas (default: *)
# [--poll-interval=N] # Interval between polls for available jobs in seconds (default: 1)
```

### Configuring Job Execution Threads

GoodJob executes enqueued jobs using threads. There is a lot than can be said about [multithreaded behavior in Ruby on Rails](https://guides.rubyonrails.org/threading_and_code_execution.html), but briefly:
Expand Down
54 changes: 44 additions & 10 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,59 @@ module GoodJob
class CLI < Thor
RAILS_ENVIRONMENT_RB = File.expand_path("config/environment.rb")

desc :start, "Start jobs"
method_option :max_threads, type: :numeric
desc :start, "Start job worker"
method_option :max_threads,
type: :numeric,
desc: "Maximum number of threads to use for working jobs (default: ActiveRecord::Base.connection_pool.size)"
method_option :queues,
type: :string,
banner: "queue1,queue2",
desc: "Queues to work from. Separate multiple queues with commas (default: *)"
method_option :poll_interval,
type: :numeric,
desc: "Interval between polls for available jobs in seconds (default: 1)"
def start
require RAILS_ENVIRONMENT_RB

max_threads = options[:max_threads] ||
ENV['GOOD_JOB_MAX_THREADS'] ||
ENV['RAILS_MAX_THREADS'] ||
ActiveRecord::Base.connection_pool.size
max_threads = (
options[:max_threads] ||
ENV['GOOD_JOB_MAX_THREADS'] ||
ENV['RAILS_MAX_THREADS'] ||
ActiveRecord::Base.connection_pool.size
).to_i

$stdout.puts "GoodJob starting with max_threads=#{max_threads}"
queue_names = (
options[:queues] ||
ENV['GOOD_JOB_QUEUES'] ||
'*'
).split(',').map(&:strip)

job_performer = GoodJob::Job.only_scheduled.priority_ordered.to_performer
scheduler = GoodJob::Scheduler.new(job_performer, pool_options: { max_threads: max_threads })
poll_interval = (
options[:poll_interval] ||
ENV['GOOD_JOB_POLL_INTERVAL']
).to_i

job_query = GoodJob::Job.all
queue_names_without_all = queue_names.reject { |q| q == '*' }
job_query = job_query.where(queue_name: queue_names_without_all) unless queue_names_without_all.size.zero?

job_performer = job_query.only_scheduled.priority_ordered.to_performer

$stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_names.join(',')}"

timer_options = {}
timer_options[:execution_interval] = poll_interval.to_i if poll_interval.present?

pool_options = {
max_threads: max_threads,
}

scheduler = GoodJob::Scheduler.new(job_performer, timer_options: timer_options, pool_options: pool_options)

@stop_good_job_executable = false
%w[INT TERM].each do |signal|
trap(signal) { @stop_good_job_executable = true }
end
@stop_good_job_executable = false

Kernel.loop do
sleep 0.1
Expand Down
5 changes: 1 addition & 4 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Job < ActiveRecord::Base

scope :only_scheduled, -> { where("scheduled_at < ?", Time.current).or(where(scheduled_at: nil)) }
scope :priority_ordered, -> { order(priority: :desc) }
scope :to_performer, -> { Performer.new(self) }

class Performer
def initialize(query)
Expand All @@ -26,10 +27,6 @@ def next
end
end

def self.to_performer
Performer.new(self)
end

def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
good_job = nil
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
Expand Down
25 changes: 24 additions & 1 deletion spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

RSpec.describe GoodJob::CLI do
let(:scheduler_mock) { instance_double GoodJob::Scheduler, shutdown?: false, shutdown: nil }
let(:env) { {} }
let(:args) { [] }

before do
stub_const 'GoodJob::CLI::RAILS_ENVIRONMENT_RB', File.expand_path("spec/dummy/config/environment.rb")
Expand Down Expand Up @@ -55,7 +57,28 @@
cli.start
end.to output.to_stdout

expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Job::Performer), pool_options: { max_threads: 4 })
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Job::Performer), pool_options: { max_threads: 4 }, timer_options: {})
end
end

describe 'queues' do
before { allow(Kernel).to receive(:loop) }

around { |example| freeze_time { example.run } }

it 'defaults to --queues, GOOD_JOB_QUEUES, all queues' do
cli = described_class.new([], { queues: 'mice,elephant' }, {})
stub_const 'ENV', ENV.to_hash.merge({ 'GOOD_JOB_QUEUES' => 'elephant,whale' })

allow(GoodJob::Scheduler).to receive(:new) do |performer, _options|
performer_query = performer.instance_variable_get(:@query)
expect(performer_query.to_sql).to eq GoodJob::Job.where(queue_name: %w[mice elephant]).only_scheduled.priority_ordered.to_sql

scheduler_mock
end

expect { cli.start }.to output.to_stdout
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Job::Performer), a_kind_of(Hash))
end
end
end
Expand Down
1 change: 1 addition & 0 deletions spec/support/time_helpers.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
RSpec.configure { |c| c.include ActiveSupport::Testing::TimeHelpers }

0 comments on commit e44d06d

Please sign in to comment.