Skip to content

Commit

Permalink
Use unique index on [cron_key, cron_at] columns to prevent duplicate …
Browse files Browse the repository at this point in the history
…cron jobs from being enqueued (#423)
  • Loading branch information
bensheldon committed Oct 25, 2021
1 parent c8828e5 commit ec73ea3
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 6 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ job.good_job_concurrency_key #=> "Unique-Alice"

GoodJob can enqueue jobs on a recurring basis that can be used as a replacement for cron.

Cron-style jobs are run on every GoodJob process (e.g. CLI or `async` execution mode) when `config.good_job.enable_cron = true`; use GoodJob's [ActiveJob concurrency](#activejob-concurrency) extension to limit the number of jobs that are enqueued.
Cron-style jobs are run on every GoodJob process (e.g. CLI or `async` execution mode) when `config.good_job.enable_cron = true`, but GoodJob's cron uses unique indexes to ensure that only a single job is enqeued at the given time interval.

Cron-format is parsed by the [`fugit`](https://github.com/floraison/fugit) gem, which has support for seconds-level resolution (e.g. `* * * * * *`).

Expand Down
2 changes: 1 addition & 1 deletion engine/app/views/good_job/cron_schedules/index.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
</td>
<td class="font-monospace"><%= cron_entry.job_class %></td>
<td><%= cron_entry.description %></td>
<td><%= cron_entry.next_at.to_local_time %></td>
<td><%= cron_entry.next_at %></td>
</tr>
<% end %>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.text :concurrency_key
t.text :cron_key
t.uuid :retried_good_job_id
t.timestamp :cron_at
end

add_index :good_jobs, :scheduled_at, where: "(finished_at IS NULL)", name: "index_good_jobs_on_scheduled_at"
add_index :good_jobs, [:queue_name, :scheduled_at], where: "(finished_at IS NULL)", name: :index_good_jobs_on_queue_name_and_scheduled_at
add_index :good_jobs, [:active_job_id, :created_at], name: :index_good_jobs_on_active_job_id_and_created_at
add_index :good_jobs, :concurrency_key, where: "(finished_at IS NULL)", name: :index_good_jobs_on_concurrency_key_when_unfinished
add_index :good_jobs, [:cron_key, :created_at], name: :index_good_jobs_on_cron_key_and_created_at
add_index :good_jobs, [:cron_key, :cron_at], name: :index_good_jobs_on_cron_key_and_cron_at, unique: true
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true
class AddCronAtToGoodJobs < ActiveRecord::Migration<%= migration_version %>
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :cron_at)
end
end

add_column :good_jobs, :cron_at, :timestamp
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true
class AddCronKeyCronAtIndexToGoodJobs < ActiveRecord::Migration<%= migration_version %>
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_cron_key_and_cron_at)
end
end

add_index :good_jobs,
[:cron_key, :cron_at],
algorithm: :concurrently,
name: :index_good_jobs_on_cron_key_and_cron_at,
unique: true
end
end
4 changes: 3 additions & 1 deletion lib/good_job/cron_entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ def description

def next_at
fugit = Fugit::Cron.parse(cron)
fugit.next_time
fugit.next_time.to_t
end

def enqueue
job_class.constantize.set(set_value).perform_later(*args_value)
rescue ActiveRecord::RecordNotUnique
false
end

private
Expand Down
6 changes: 4 additions & 2 deletions lib/good_job/cron_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ def shutdown?
# Enqueues a scheduled task
# @param cron_entry [CronEntry] the CronEntry object to schedule
def create_task(cron_entry)
delay = [(cron_entry.next_at - Time.current).to_f, 0].max
future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry]) do |thr_scheduler, thr_cron_entry|
cron_at = cron_entry.next_at
delay = [(cron_at - Time.current).to_f, 0].max
future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at]) do |thr_scheduler, thr_cron_entry, thr_cron_at|
# Re-schedule the next cron task before executing the current task
thr_scheduler.create_task(thr_cron_entry)

Rails.application.executor.wrap do
CurrentThread.reset
CurrentThread.cron_key = thr_cron_entry.key
CurrentThread.cron_at = thr_cron_at

cron_entry.enqueue
end
Expand Down
7 changes: 7 additions & 0 deletions lib/good_job/current_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ module GoodJob
# Thread-local attributes for passing values from Instrumentation.
# (Cannot use ActiveSupport::CurrentAttributes because ActiveJob resets it)
module CurrentThread
# @!attribute [rw] cron_at
# @!scope class
# Cron At
# @return [DateTime, nil]
thread_mattr_accessor :cron_at

# @!attribute [rw] cron_key
# @!scope class
# Cron Key
Expand Down Expand Up @@ -32,6 +38,7 @@ module CurrentThread
# Resets attributes
# @return [void]
def self.reset
self.cron_at = nil
self.cron_key = nil
self.execution = nil
self.error_on_discard = nil
Expand Down
18 changes: 18 additions & 0 deletions lib/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ def self.queue_parser(string)
end
end

def self._migration_pending_warning
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:
rails generate good_job:update
To apply the migration files, run:
rails db:migrate
DEPRECATION
nil
end

# Get Jobs with given ActiveJob ID
# @!method active_job_id
# @!scope class
Expand Down Expand Up @@ -225,6 +235,14 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false

if CurrentThread.cron_key
execution_args[:cron_key] = CurrentThread.cron_key

@cron_at_index = column_names.include?('cron_at') && connection.index_name_exists?(:good_jobs, :index_good_jobs_on_cron_key_and_cron_at) unless instance_variable_defined?(:@cron_at_index)

if @cron_at_index
execution_args[:cron_at] = CurrentThread.cron_at
else
_migration_pending_warning
end
elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
execution_args[:cron_key] = CurrentThread.execution.cron_key
end
Expand Down
14 changes: 14 additions & 0 deletions spec/lib/good_job/cron_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,28 @@
wait_until(max: 5) do
expect(GoodJob::Execution.count).to be > 3
end
cron_manager.shutdown

execution = GoodJob::Execution.first
expect(execution).to have_attributes(
cron_key: 'example',
priority: -10
)
end

it 'only inserts unique jobs when multiple CronManagers are running' do
cron_manager = described_class.new(cron_entries, start_on_initialize: true)
other_cron_manager = described_class.new(cron_entries, start_on_initialize: true)

wait_until(max: 5) do
expect(GoodJob::Execution.count).to be > 3
end

cron_manager.shutdown
other_cron_manager.shutdown

executions = GoodJob::Execution.all.to_a
expect(executions.size).to eq executions.map(&:cron_at).uniq.size
end
end
end
1 change: 1 addition & 0 deletions spec/lib/good_job/current_thread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

RSpec.describe GoodJob::CurrentThread do
[
:cron_at,
:cron_key,
:execution,
:error_on_discard,
Expand Down
1 change: 1 addition & 0 deletions spec/test_app/config/environments/development.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
config.active_job.queue_adapter = :good_job
GoodJob.retry_on_unhandled_error = false
GoodJob.preserve_job_records = true
GoodJob.on_thread_error = -> (error) { Rails.logger.warn(error) }

config.good_job.enable_cron = ActiveModel::Type::Boolean.new.cast(ENV.fetch('GOOD_JOB_ENABLE_CRON', true))
config.good_job.cron = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true
class AddCronAtToGoodJobs < ActiveRecord::Migration[5.2]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :cron_at)
end
end

add_column :good_jobs, :cron_at, :timestamp
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true
class AddCronKeyCronAtIndexToGoodJobs < ActiveRecord::Migration[5.2]
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_cron_key_and_cron_at)
end
end

add_index :good_jobs,
[:cron_key, :cron_at],
algorithm: :concurrently,
name: :index_good_jobs_on_cron_key_and_cron_at,
unique: true
end
end
4 changes: 3 additions & 1 deletion spec/test_app/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2021_08_15_160735) do
ActiveRecord::Schema.define(version: 2021_10_11_221038) do

# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
Expand All @@ -30,9 +30,11 @@
t.text "concurrency_key"
t.text "cron_key"
t.uuid "retried_good_job_id"
t.datetime "cron_at"
t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at"
t.index ["concurrency_key"], name: "index_good_jobs_on_concurrency_key_when_unfinished", where: "(finished_at IS NULL)"
t.index ["cron_key", "created_at"], name: "index_good_jobs_on_cron_key_and_created_at"
t.index ["cron_key", "cron_at"], name: "index_good_jobs_on_cron_key_and_cron_at", unique: true
t.index ["queue_name", "scheduled_at"], name: "index_good_jobs_on_queue_name_and_scheduled_at", where: "(finished_at IS NULL)"
t.index ["scheduled_at"], name: "index_good_jobs_on_scheduled_at", where: "(finished_at IS NULL)"
end
Expand Down

0 comments on commit ec73ea3

Please sign in to comment.