diff --git a/.rubocop.yml b/.rubocop.yml index 97747b7d..19ce7522 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -22,6 +22,10 @@ Rails/EnvironmentVariableAccess: RSpec/IndexedLet: Enabled: false +Layout/LineLength: + Exclude: + - 'spec/**/*' + Rails/SquishedSQLHeredocs: Exclude: - 'spec/**/*' diff --git a/lib/delayed/monitor.rb b/lib/delayed/monitor.rb index d9663f92..63b925e6 100644 --- a/lib/delayed/monitor.rb +++ b/lib/delayed/monitor.rb @@ -35,6 +35,33 @@ def query_for(metric) send(:"#{metric}_grouped") end + def self.sql_now_in_utc + case ActiveRecord::Base.connection.adapter_name + when 'PostgreSQL' + "TIMEZONE('UTC', NOW())" + when 'MySQL', 'Mysql2' + "UTC_TIMESTAMP()" + else + "CURRENT_TIMESTAMP" + end + end + + def self.parse_utc_time(string) + # Depending on Rails version & DB adapter, this will be either a String or a DateTime. + # If it's a DateTime, and if connection is running with the `:local` time zone config, + # then by default Rails incorrectly assumes it's in local time instead of UTC. + # We use `strftime` to strip the encoded TZ info and re-parse it as UTC. + # + # Example: + # - "2026-02-05 10:01:23" -> DB-returned string + # - "2026-02-05 10:01:23 -0600" -> Rails-parsed DateTime with incorrect TZ + # - "2026-02-05 10:01:23" -> `strftime` output + # - "2026-02-05 04:01:23 -0600" -> Re-parsed as UTC and converted to local time + string = string.strftime('%Y-%m-%d %H:%M:%S') if string.respond_to?(:strftime) + + ActiveSupport::TimeZone.new("UTC").parse(string) + end + private attr_reader :jobs @@ -75,7 +102,15 @@ def grouped_count(scope) def grouped_min(scope, column) Delayed::Job.from(scope.select("priority, queue, MIN(#{column}) AS #{column}")) - .group(priority_case_statement, :queue).minimum(column) + .group(priority_case_statement, :queue) + .select(<<~SQL.squish) + (#{priority_case_statement}) AS priority, + queue, + MIN(#{column}) AS #{column}, + #{self.class.sql_now_in_utc} AS db_now_utc + SQL + .group_by { |j| [j.priority.to_i, j.queue] } + .transform_values(&:first) end def count_grouped @@ -107,16 +142,16 @@ def failed_count_grouped end def max_lock_age_grouped - oldest_locked_job_grouped.transform_values { |locked_at| Job.db_time_now - locked_at } + oldest_locked_at_query.transform_values { |j| db_now(j) - j.locked_at } end def max_age_grouped - oldest_workable_job_grouped.transform_values { |run_at| Job.db_time_now - run_at } + oldest_run_at_query.transform_values { |j| db_now(j) - j.run_at } end def alert_age_percent_grouped - oldest_workable_job_grouped.each_with_object({}) do |((priority, queue), run_at), metrics| - max_age = Job.db_time_now - run_at + oldest_run_at_query.each_with_object({}) do |((priority, queue), j), metrics| + max_age = db_now(j) - j.run_at alert_age = Priority.new(priority).alert_age metrics[[priority, queue]] = [max_age / alert_age * 100, 100].min if alert_age end @@ -129,11 +164,23 @@ def workable_count_grouped alias working_count_grouped locked_count_grouped def oldest_locked_job_grouped - grouped_min(jobs.claimed, :locked_at) + oldest_locked_at_query.transform_values(&:locked_at) end def oldest_workable_job_grouped - @memo[:oldest_workable_job_grouped] ||= grouped_min(jobs.claimable, :run_at) + oldest_run_at_query.transform_values(&:run_at) + end + + def oldest_locked_at_query + @memo[:oldest_locked_at_query] ||= grouped_min(jobs.claimed, :locked_at) + end + + def oldest_run_at_query + @memo[:oldest_run_at_query] ||= grouped_min(jobs.claimable, :run_at) + end + + def db_now(record) + self.class.parse_utc_time(record.db_now_utc) end def priority_case_statement diff --git a/spec/delayed/__snapshots__/monitor_spec.rb.snap b/spec/delayed/__snapshots__/monitor_spec.rb.snap index 7b4e02f6..5ee5ff1f 100644 --- a/spec/delayed/__snapshots__/monitor_spec.rb.snap +++ b/spec/delayed/__snapshots__/monitor_spec.rb.snap @@ -318,10 +318,10 @@ GroupAggregate (cost=...) SNAP snapshots["runs the expected postgresql query for max_lock_age 1"] = <<-SNAP -SELECT MIN(locked_at) AS minimum_locked_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"queue\" AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(locked_at) AS locked_at, TIMEZONE('UTC', NOW()) AS db_now_utc FROM (SELECT priority, queue, MIN(locked_at) AS locked_at FROM \"delayed_jobs\" WHERE \"delayed_jobs\".\"locked_at\" >= '2025-11-10 16:59:43' @@ -336,7 +336,7 @@ SNAP snapshots["produces the expected postgresql query plan for max_lock_age 1"] = <<-SNAP GroupAggregate (cost=...) - Output: min(subquery.locked_at), (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue + Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.locked_at), timezone('UTC'::text, now()) Group Key: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue -> Sort (cost=...) Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.locked_at @@ -357,7 +357,7 @@ SNAP snapshots["[legacy index] produces the expected postgresql query plan for max_lock_age 1"] = <<-SNAP GroupAggregate (cost=...) - Output: min(subquery.locked_at), (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue + Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.locked_at), timezone('UTC'::text, now()) Group Key: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue -> Sort (cost=...) Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.locked_at @@ -377,10 +377,10 @@ GroupAggregate (cost=...) SNAP snapshots["runs the expected postgresql query for max_age 1"] = <<-SNAP -SELECT MIN(run_at) AS minimum_run_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"queue\" AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(run_at) AS run_at, TIMEZONE('UTC', NOW()) AS db_now_utc FROM (SELECT priority, queue, MIN(run_at) AS run_at FROM \"delayed_jobs\" WHERE (\"delayed_jobs\".\"locked_at\" IS NULL @@ -396,7 +396,7 @@ SNAP snapshots["produces the expected postgresql query plan for max_age 1"] = <<-SNAP GroupAggregate (cost=...) - Output: min(subquery.run_at), (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue + Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, now()) Group Key: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue -> Sort (cost=...) Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.run_at @@ -417,7 +417,7 @@ SNAP snapshots["[legacy index] produces the expected postgresql query plan for max_age 1"] = <<-SNAP GroupAggregate (cost=...) - Output: min(subquery.run_at), (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue + Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, now()) Group Key: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue -> Sort (cost=...) Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.run_at @@ -556,10 +556,10 @@ GroupAggregate (cost=...) SNAP snapshots["runs the expected postgresql query for alert_age_percent 1"] = <<-SNAP -SELECT MIN(run_at) AS minimum_run_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"queue\" AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(run_at) AS run_at, TIMEZONE('UTC', NOW()) AS db_now_utc FROM (SELECT priority, queue, MIN(run_at) AS run_at FROM \"delayed_jobs\" WHERE (\"delayed_jobs\".\"locked_at\" IS NULL @@ -575,7 +575,7 @@ SNAP snapshots["produces the expected postgresql query plan for alert_age_percent 1"] = <<-SNAP GroupAggregate (cost=...) - Output: min(subquery.run_at), (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue + Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, now()) Group Key: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue -> Sort (cost=...) Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.run_at @@ -596,7 +596,7 @@ SNAP snapshots["[legacy index] produces the expected postgresql query plan for alert_age_percent 1"] = <<-SNAP GroupAggregate (cost=...) - Output: min(subquery.run_at), (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue + Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, now()) Group Key: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue -> Sort (cost=...) Output: (CASE WHEN ((subquery.priority >= 0) AND (subquery.priority < 10)) THEN 0 WHEN ((subquery.priority >= 10) AND (subquery.priority < 20)) THEN 10 WHEN ((subquery.priority >= 20) AND (subquery.priority < 30)) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.run_at @@ -795,10 +795,10 @@ USE TEMP B-TREE FOR GROUP BY SNAP snapshots["runs the expected sqlite3 query for max_lock_age 1"] = <<-SNAP -SELECT MIN(locked_at) AS minimum_locked_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"queue\" AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(locked_at) AS locked_at, CURRENT_TIMESTAMP AS db_now_utc FROM (SELECT priority, queue, MIN(locked_at) AS locked_at FROM \"delayed_jobs\" WHERE \"delayed_jobs\".\"locked_at\" >= '2025-11-10 16:59:43' @@ -828,10 +828,10 @@ USE TEMP B-TREE FOR GROUP BY SNAP snapshots["runs the expected sqlite3 query for max_age 1"] = <<-SNAP -SELECT MIN(run_at) AS minimum_run_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"queue\" AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(run_at) AS run_at, CURRENT_TIMESTAMP AS db_now_utc FROM (SELECT priority, queue, MIN(run_at) AS run_at FROM \"delayed_jobs\" WHERE (\"delayed_jobs\".\"locked_at\" IS NULL @@ -929,10 +929,10 @@ USE TEMP B-TREE FOR GROUP BY SNAP snapshots["runs the expected sqlite3 query for alert_age_percent 1"] = <<-SNAP -SELECT MIN(run_at) AS minimum_run_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, \"queue\" AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(run_at) AS run_at, CURRENT_TIMESTAMP AS db_now_utc FROM (SELECT priority, queue, MIN(run_at) AS run_at FROM \"delayed_jobs\" WHERE (\"delayed_jobs\".\"locked_at\" IS NULL @@ -1149,10 +1149,10 @@ snapshots["[legacy index] produces the expected mysql2 query plan for failed_cou SNAP snapshots["runs the expected mysql2 query for max_lock_age 1"] = <<-SNAP -SELECT MIN(locked_at) AS minimum_locked_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, `queue` AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(locked_at) AS locked_at, UTC_TIMESTAMP() AS db_now_utc FROM (SELECT priority, queue, MIN(locked_at) AS locked_at FROM `delayed_jobs` WHERE `delayed_jobs`.`locked_at` >= '2025-11-10 16:59:43' @@ -1188,10 +1188,10 @@ snapshots["[legacy index] produces the expected mysql2 query plan for max_lock_a SNAP snapshots["runs the expected mysql2 query for max_age 1"] = <<-SNAP -SELECT MIN(run_at) AS minimum_run_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, `queue` AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(run_at) AS run_at, UTC_TIMESTAMP() AS db_now_utc FROM (SELECT priority, queue, MIN(run_at) AS run_at FROM `delayed_jobs` WHERE (`delayed_jobs`.`locked_at` IS NULL @@ -1307,10 +1307,10 @@ snapshots["[legacy index] produces the expected mysql2 query plan for workable_c SNAP snapshots["runs the expected mysql2 query for alert_age_percent 1"] = <<-SNAP -SELECT MIN(run_at) AS minimum_run_at, CASE WHEN priority >= 0 +SELECT (CASE WHEN priority >= 0 AND priority < 10 THEN 0 WHEN priority >= 10 AND priority < 20 THEN 10 WHEN priority >= 20 - AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END AS case_when_priority_0_and_priority_10_then_0_when_priority_10_an, `queue` AS queue + AND priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority, queue, MIN(run_at) AS run_at, UTC_TIMESTAMP() AS db_now_utc FROM (SELECT priority, queue, MIN(run_at) AS run_at FROM `delayed_jobs` WHERE (`delayed_jobs`.`locked_at` IS NULL diff --git a/spec/delayed/job_spec.rb b/spec/delayed/job_spec.rb index 1bb4faa8..8ab22e27 100644 --- a/spec/delayed/job_spec.rb +++ b/spec/delayed/job_spec.rb @@ -1006,12 +1006,6 @@ def create_job(opts = {}) end end - if ActiveRecord::VERSION::MAJOR >= 7 - delegate :default_timezone=, to: ActiveRecord - else - delegate :default_timezone=, to: ActiveRecord::Base - end - context "db_time_now" do after do Time.zone = nil diff --git a/spec/delayed/monitor_spec.rb b/spec/delayed/monitor_spec.rb index c22f7b3d..15160ba3 100644 --- a/spec/delayed/monitor_spec.rb +++ b/spec/delayed/monitor_spec.rb @@ -14,262 +14,333 @@ } end - it 'emits empty metrics for all default priorities' do - expect { subject.run! } - .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) - .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) - end + describe '#run!' do + let(:app_local_db_time) { false } - context 'when named priorities are customized' do around do |example| - Delayed::Priority.names = { high: 0, low: 7 } - example.run - ensure - Delayed::Priority.names = nil - end + if app_local_db_time + Time.zone = 'US/Central' + self.default_timezone = :local + end - it 'emits empty metrics for all custom priorities' do - expect { subject.run! } - .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) - .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'low')).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'high')).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'low')).with_value(0) + # On PostgreSQL, running examples in a transaction allows CURRENT_TIMESTAMP to remain stable. + # We can in turn use this to set Timecop to the same time as the DB for deterministic time math. + Delayed::Job.transaction do + now = described_class.parse_utc_time( + Delayed::Job.connection.select_value("SELECT #{described_class.sql_now_in_utc}"), + ) + Timecop.freeze(now) { example.run } + end + ensure + Time.zone = nil + self.default_timezone = :utc end - end - context 'when there are jobs in the queue' do - let(:now) { Delayed::Job.db_time_now.change(nsec: 0) } - let(:job_attributes) do - { - run_at: now, - queue: 'default', - handler: "--- !ruby/object:SimpleJob\n", - attempts: 0, - } - end - let(:failed_attributes) { { run_at: now - 1.week, attempts: 1, failed_at: now - 1.day, locked_at: now - 1.day } } - let(:p0_attributes) { job_attributes.merge(priority: 1, attempts: 1) } - let(:p10_attributes) { job_attributes.merge(priority: 13, locked_at: now - 1.day) } - let(:p20_attributes) { job_attributes.merge(priority: 23, attempts: 1) } - let(:p30_attributes) { job_attributes.merge(priority: 999, locked_at: now - 1.day) } - let(:p0_payload) { default_payload.merge(priority: 'interactive') } - let(:p10_payload) { default_payload.merge(priority: 'user_visible') } - let(:p20_payload) { default_payload.merge(priority: 'eventual') } - let(:p30_payload) { default_payload.merge(priority: 'reporting') } - let!(:p0_workable_job) { Delayed::Job.create! p0_attributes.merge(run_at: now - 30.seconds) } - let!(:p0_failed_job) { Delayed::Job.create! p0_attributes.merge(failed_attributes) } - let!(:p0_future_job) { Delayed::Job.create! p0_attributes.merge(run_at: now + 1.hour) } - let!(:p0_working_job) { Delayed::Job.create! p0_attributes.merge(locked_at: now - 3.minutes) } - let!(:p10_workable_job) { Delayed::Job.create! p10_attributes.merge(run_at: now - 2.minutes) } - let!(:p10_failed_job) { Delayed::Job.create! p10_attributes.merge(failed_attributes) } - let!(:p10_future_job) { Delayed::Job.create! p10_attributes.merge(run_at: now + 1.hour) } - let!(:p10_working_job) { Delayed::Job.create! p10_attributes.merge(locked_at: now - 7.minutes) } - let!(:p20_workable_job) { Delayed::Job.create! p20_attributes.merge(run_at: now - 1.hour) } - let!(:p20_failed_job) { Delayed::Job.create! p20_attributes.merge(failed_attributes) } - let!(:p20_future_job) { Delayed::Job.create! p20_attributes.merge(run_at: now + 1.hour) } - let!(:p20_working_job) { Delayed::Job.create! p20_attributes.merge(locked_at: now - 9.minutes) } - let!(:p30_workable_job) { Delayed::Job.create! p30_attributes.merge(run_at: now - 6.hours) } - let!(:p30_failed_job) { Delayed::Job.create! p30_attributes.merge(failed_attributes) } - let!(:p30_future_job) { Delayed::Job.create! p30_attributes.merge(run_at: now + 1.hour) } - let!(:p30_working_job) { Delayed::Job.create! p30_attributes.merge(locked_at: now - 11.minutes) } - let!(:p30_workable_job_in_other_queue) { Delayed::Job.create! p30_attributes.merge(run_at: now - 4.hours, queue: 'banana') } + let(:now) { Delayed::Job.db_time_now } - around do |example| - Timecop.freeze(now) { example.run } - end - - it 'emits the expected results for each metric' do + it 'emits empty metrics for all default priorities' do expect { subject.run! } .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) - .and emit_notification("delayed.job.count").with_payload(p0_payload).with_value(4) - .and emit_notification("delayed.job.future_count").with_payload(p0_payload).with_value(1) - .and emit_notification("delayed.job.locked_count").with_payload(p0_payload).with_value(1) - .and emit_notification("delayed.job.erroring_count").with_payload(p0_payload).with_value(3) - .and emit_notification("delayed.job.failed_count").with_payload(p0_payload).with_value(1) - .and emit_notification("delayed.job.working_count").with_payload(p0_payload).with_value(1) - .and emit_notification("delayed.job.workable_count").with_payload(p0_payload).with_value(1) - .and emit_notification("delayed.job.max_age").with_payload(p0_payload).with_value(30.seconds) - .and emit_notification("delayed.job.max_lock_age").with_payload(p0_payload).with_value(3.minutes) - .and emit_notification("delayed.job.alert_age_percent").with_payload(p0_payload).with_value(30.0.seconds / 1.minute * 100) - .and emit_notification("delayed.job.count").with_payload(p10_payload).with_value(4) - .and emit_notification("delayed.job.future_count").with_payload(p10_payload).with_value(1) - .and emit_notification("delayed.job.locked_count").with_payload(p10_payload).with_value(1) - .and emit_notification("delayed.job.erroring_count").with_payload(p10_payload).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(p10_payload).with_value(1) - .and emit_notification("delayed.job.working_count").with_payload(p10_payload).with_value(1) - .and emit_notification("delayed.job.workable_count").with_payload(p10_payload).with_value(1) - .and emit_notification("delayed.job.max_age").with_payload(p10_payload).with_value(2.minutes) - .and emit_notification("delayed.job.max_lock_age").with_payload(p10_payload).with_value(7.minutes) - .and emit_notification("delayed.job.alert_age_percent").with_payload(p10_payload).with_value(2.0.minutes / 3.minutes * 100) - .and emit_notification("delayed.job.count").with_payload(p20_payload).with_value(4) - .and emit_notification("delayed.job.future_count").with_payload(p20_payload).with_value(1) - .and emit_notification("delayed.job.locked_count").with_payload(p20_payload).with_value(1) - .and emit_notification("delayed.job.erroring_count").with_payload(p20_payload).with_value(3) - .and emit_notification("delayed.job.failed_count").with_payload(p20_payload).with_value(1) - .and emit_notification("delayed.job.working_count").with_payload(p20_payload).with_value(1) - .and emit_notification("delayed.job.workable_count").with_payload(p20_payload).with_value(1) - .and emit_notification("delayed.job.max_age").with_payload(p20_payload).with_value(1.hour) - .and emit_notification("delayed.job.max_lock_age").with_payload(p20_payload).with_value(9.minutes) - .and emit_notification("delayed.job.alert_age_percent").with_payload(p20_payload).with_value(1.hour / 1.5.hours * 100) - .and emit_notification("delayed.job.count").with_payload(p30_payload).with_value(4) - .and emit_notification("delayed.job.future_count").with_payload(p30_payload).with_value(1) - .and emit_notification("delayed.job.locked_count").with_payload(p30_payload).with_value(1) - .and emit_notification("delayed.job.erroring_count").with_payload(p30_payload).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(p30_payload).with_value(1) - .and emit_notification("delayed.job.working_count").with_payload(p30_payload).with_value(1) - .and emit_notification("delayed.job.workable_count").with_payload(p30_payload).with_value(1) - .and emit_notification("delayed.job.max_age").with_payload(p30_payload).with_value(6.hours) - .and emit_notification("delayed.job.max_lock_age").with_payload(p30_payload).with_value(11.minutes) - .and emit_notification("delayed.job.alert_age_percent").with_payload(p30_payload).with_value(100) # 6 hours / 4 hours (overflow) - .and emit_notification("delayed.job.workable_count").with_payload(p30_payload.merge(queue: 'banana')).with_value(1) - .and emit_notification("delayed.job.max_age").with_payload(p30_payload.merge(queue: 'banana')).with_value(4.hours) + .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) + .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) + .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) + .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) + .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) + .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) + .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) + .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'interactive')).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'user_visible')).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'eventual')).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'reporting')).with_value(0) + .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'interactive')).approximately.with_value(0) + .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'user_visible')).approximately.with_value(0) + .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'eventual')).approximately.with_value(0) + .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'reporting')).approximately.with_value(0) + .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'interactive')).approximately.with_value(0) + .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'user_visible')).approximately.with_value(0) + .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'eventual')).approximately.with_value(0) + .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'reporting')).approximately.with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'interactive')).approximately.with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'user_visible')).approximately.with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'eventual')).approximately.with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'reporting')).approximately.with_value(0) end context 'when named priorities are customized' do around do |example| - Delayed::Priority.names = { high: 0, low: 20 } + Delayed::Priority.names = { high: 0, low: 7 } example.run ensure Delayed::Priority.names = nil end - let(:p0_payload) { default_payload.merge(priority: 'high') } - let(:p20_payload) { default_payload.merge(priority: 'low') } + + it 'emits empty metrics for all custom priorities' do + expect { subject.run! } + .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) + .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.count").with_payload(default_payload.merge(priority: 'low')).with_value(0) + .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.future_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(default_payload.merge(priority: 'low')).with_value(0) + .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'high')).approximately.with_value(0) + .and emit_notification("delayed.job.max_age").with_payload(default_payload.merge(priority: 'low')).approximately.with_value(0) + .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'high')).approximately.with_value(0) + .and emit_notification("delayed.job.max_lock_age").with_payload(default_payload.merge(priority: 'low')).approximately.with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'high')).with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(default_payload.merge(priority: 'low')).with_value(0) + end + end + + context 'when there are jobs in the queue' do + let(:job_attributes) do + { + run_at: now, + queue: 'default', + handler: "--- !ruby/object:SimpleJob\n", + attempts: 0, + } + end + let(:failed_attributes) { { run_at: now - 1.week, attempts: 1, failed_at: now - 1.day, locked_at: now - 1.day } } + let(:p0_attributes) { job_attributes.merge(priority: 1, attempts: 1) } + let(:p10_attributes) { job_attributes.merge(priority: 13, locked_at: now - 1.day) } + let(:p20_attributes) { job_attributes.merge(priority: 23, attempts: 1) } + let(:p30_attributes) { job_attributes.merge(priority: 999, locked_at: now - 1.day) } + let(:p0_payload) { default_payload.merge(priority: 'interactive') } + let(:p10_payload) { default_payload.merge(priority: 'user_visible') } + let(:p20_payload) { default_payload.merge(priority: 'eventual') } + let(:p30_payload) { default_payload.merge(priority: 'reporting') } + let!(:p0_workable_job) { Delayed::Job.create! p0_attributes.merge(run_at: now - 30.seconds) } + let!(:p0_failed_job) { Delayed::Job.create! p0_attributes.merge(failed_attributes) } + let!(:p0_future_job) { Delayed::Job.create! p0_attributes.merge(run_at: now + 1.hour) } + let!(:p0_working_job) { Delayed::Job.create! p0_attributes.merge(locked_at: now - 3.minutes) } + let!(:p10_workable_job) { Delayed::Job.create! p10_attributes.merge(run_at: now - 2.minutes) } + let!(:p10_failed_job) { Delayed::Job.create! p10_attributes.merge(failed_attributes) } + let!(:p10_future_job) { Delayed::Job.create! p10_attributes.merge(run_at: now + 1.hour) } + let!(:p10_working_job) { Delayed::Job.create! p10_attributes.merge(locked_at: now - 7.minutes) } + let!(:p20_workable_job) { Delayed::Job.create! p20_attributes.merge(run_at: now - 1.hour) } + let!(:p20_failed_job) { Delayed::Job.create! p20_attributes.merge(failed_attributes) } + let!(:p20_future_job) { Delayed::Job.create! p20_attributes.merge(run_at: now + 1.hour) } + let!(:p20_working_job) { Delayed::Job.create! p20_attributes.merge(locked_at: now - 9.minutes) } + let!(:p30_workable_job) { Delayed::Job.create! p30_attributes.merge(run_at: now - 6.hours) } + let!(:p30_failed_job) { Delayed::Job.create! p30_attributes.merge(failed_attributes) } + let!(:p30_future_job) { Delayed::Job.create! p30_attributes.merge(run_at: now + 1.hour) } + let!(:p30_working_job) { Delayed::Job.create! p30_attributes.merge(locked_at: now - 11.minutes) } + let!(:p30_workable_job_in_other_queue) { Delayed::Job.create! p30_attributes.merge(run_at: now - 4.hours, queue: 'banana') } it 'emits the expected results for each metric' do expect { subject.run! } .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) - .and emit_notification("delayed.job.count").with_payload(p0_payload).with_value(8) - .and emit_notification("delayed.job.future_count").with_payload(p0_payload).with_value(2) - .and emit_notification("delayed.job.locked_count").with_payload(p0_payload).with_value(2) + .and emit_notification("delayed.job.count").with_payload(p0_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p0_payload).with_value(1) .and emit_notification("delayed.job.erroring_count").with_payload(p0_payload).with_value(3) - .and emit_notification("delayed.job.failed_count").with_payload(p0_payload).with_value(2) - .and emit_notification("delayed.job.working_count").with_payload(p0_payload).with_value(2) - .and emit_notification("delayed.job.workable_count").with_payload(p0_payload).with_value(2) - .and emit_notification("delayed.job.max_age").with_payload(p0_payload).with_value(2.minutes) - .and emit_notification("delayed.job.max_lock_age").with_payload(p0_payload).with_value(7.minutes) - .and emit_notification("delayed.job.alert_age_percent").with_payload(p0_payload).with_value(0) - .and emit_notification("delayed.job.count").with_payload(p20_payload).with_value(8) - .and emit_notification("delayed.job.future_count").with_payload(p20_payload).with_value(2) - .and emit_notification("delayed.job.locked_count").with_payload(p20_payload).with_value(2) + .and emit_notification("delayed.job.failed_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p0_payload).approximately.with_value(30.seconds) + .and emit_notification("delayed.job.max_lock_age").with_payload(p0_payload).approximately.with_value(3.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p0_payload).approximately.with_value(30.0.seconds / 1.minute * 100) + .and emit_notification("delayed.job.count").with_payload(p10_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.erroring_count").with_payload(p10_payload).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p10_payload).approximately.with_value(2.minutes) + .and emit_notification("delayed.job.max_lock_age").with_payload(p10_payload).approximately.with_value(7.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p10_payload).approximately.with_value(2.0.minutes / 3.minutes * 100) + .and emit_notification("delayed.job.count").with_payload(p20_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p20_payload).with_value(1) .and emit_notification("delayed.job.erroring_count").with_payload(p20_payload).with_value(3) - .and emit_notification("delayed.job.failed_count").with_payload(p20_payload).with_value(2) - .and emit_notification("delayed.job.working_count").with_payload(p20_payload).with_value(2) - .and emit_notification("delayed.job.workable_count").with_payload(p20_payload).with_value(2) - .and emit_notification("delayed.job.max_age").with_payload(p20_payload).with_value(6.hours) - .and emit_notification("delayed.job.max_lock_age").with_payload(p20_payload).with_value(11.minutes) - .and emit_notification("delayed.job.alert_age_percent").with_payload(p20_payload).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(p20_payload.merge(queue: 'banana')).with_value(1) - .and emit_notification("delayed.job.max_age").with_payload(p20_payload.merge(queue: 'banana')).with_value(4.hours) + .and emit_notification("delayed.job.failed_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p20_payload).approximately.with_value(1.hour) + .and emit_notification("delayed.job.max_lock_age").with_payload(p20_payload).approximately.with_value(9.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p20_payload).approximately.with_value(1.hour / 1.5.hours * 100) + .and emit_notification("delayed.job.count").with_payload(p30_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.erroring_count").with_payload(p30_payload).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p30_payload).approximately.with_value(6.hours) + .and emit_notification("delayed.job.max_lock_age").with_payload(p30_payload).approximately.with_value(11.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p30_payload).approximately.with_value(100) # 6 hours / 4 hours (overflow) + .and emit_notification("delayed.job.workable_count").with_payload(p30_payload.merge(queue: 'banana')).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p30_payload.merge(queue: 'banana')).approximately.with_value(4.hours) end - context 'when alert thresholds are specified' do + context 'when named priorities are customized' do around do |example| - Delayed::Priority.alerts = { high: { age: 3.hours }, low: { age: 1.year } } + Delayed::Priority.names = { high: 0, low: 20 } example.run ensure - Delayed::Priority.alerts = nil + Delayed::Priority.names = nil end + let(:p0_payload) { default_payload.merge(priority: 'high') } + let(:p20_payload) { default_payload.merge(priority: 'low') } - it 'emits the expected alert_age_percent results' do + it 'emits the expected results for each metric' do expect { subject.run! } - .to emit_notification("delayed.job.alert_age_percent").with_payload(p0_payload).with_value(2.0.minutes / 3.hours * 100) - .and emit_notification("delayed.job.alert_age_percent").with_payload(p20_payload).with_value(6.0.hours / 1.year * 100) + .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) + .and emit_notification("delayed.job.count").with_payload(p0_payload).with_value(8) + .and emit_notification("delayed.job.future_count").with_payload(p0_payload).with_value(2) + .and emit_notification("delayed.job.locked_count").with_payload(p0_payload).with_value(2) + .and emit_notification("delayed.job.erroring_count").with_payload(p0_payload).with_value(3) + .and emit_notification("delayed.job.failed_count").with_payload(p0_payload).with_value(2) + .and emit_notification("delayed.job.working_count").with_payload(p0_payload).with_value(2) + .and emit_notification("delayed.job.workable_count").with_payload(p0_payload).with_value(2) + .and emit_notification("delayed.job.max_age").with_payload(p0_payload).approximately.with_value(2.minutes) + .and emit_notification("delayed.job.max_lock_age").with_payload(p0_payload).approximately.with_value(7.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p0_payload).approximately.with_value(0) + .and emit_notification("delayed.job.count").with_payload(p20_payload).with_value(8) + .and emit_notification("delayed.job.future_count").with_payload(p20_payload).with_value(2) + .and emit_notification("delayed.job.locked_count").with_payload(p20_payload).with_value(2) + .and emit_notification("delayed.job.erroring_count").with_payload(p20_payload).with_value(3) + .and emit_notification("delayed.job.failed_count").with_payload(p20_payload).with_value(2) + .and emit_notification("delayed.job.working_count").with_payload(p20_payload).with_value(2) + .and emit_notification("delayed.job.workable_count").with_payload(p20_payload).with_value(2) + .and emit_notification("delayed.job.max_age").with_payload(p20_payload).approximately.with_value(6.hours) + .and emit_notification("delayed.job.max_lock_age").with_payload(p20_payload).approximately.with_value(11.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p20_payload).approximately.with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(p20_payload.merge(queue: 'banana')).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p20_payload.merge(queue: 'banana')).approximately.with_value(4.hours) + end + + context 'when alert thresholds are specified' do + around do |example| + Delayed::Priority.alerts = { high: { age: 3.hours }, low: { age: 1.year } } + example.run + ensure + Delayed::Priority.alerts = nil + end + + it 'emits the expected alert_age_percent results' do + expect { subject.run! } + .to emit_notification("delayed.job.alert_age_percent").with_payload(p0_payload).approximately.with_value(2.0.minutes / 3.hours * 100) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p20_payload).approximately.with_value(6.0.hours / 1.year * 100) + end end end - end - context 'when worker queues are specified' do - around do |example| - Delayed::Worker.queues = %w(banana gram) - Delayed::Priority.names = { interactive: 0 } # avoid splitting by priority for simplicity - Delayed::Priority.alerts = { interactive: { age: 8.hours } } - example.run - ensure - Delayed::Priority.names = nil - Delayed::Worker.queues = [] + context 'when worker queues are specified' do + around do |example| + Delayed::Worker.queues = %w(banana gram) + Delayed::Priority.names = { interactive: 0 } # avoid splitting by priority for simplicity + Delayed::Priority.alerts = { interactive: { age: 8.hours } } + example.run + ensure + Delayed::Priority.names = nil + Delayed::Worker.queues = [] + end + let(:banana_payload) { default_payload.merge(queue: 'banana', priority: 'interactive') } + let(:gram_payload) { default_payload.merge(queue: 'gram', priority: 'interactive') } + + it 'emits the expected results for each queue' do + expect { subject.run! } + .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) + .and emit_notification("delayed.job.count").with_payload(banana_payload).with_value(1) + .and emit_notification("delayed.job.future_count").with_payload(banana_payload).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(banana_payload).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(banana_payload).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(banana_payload).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(banana_payload).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(banana_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(banana_payload).approximately.with_value(4.hours) + .and emit_notification("delayed.job.max_lock_age").with_payload(banana_payload).approximately.with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(banana_payload).approximately.with_value(4.0.hours / 8.hours * 100) + .and emit_notification("delayed.job.count").with_payload(gram_payload).with_value(0) + .and emit_notification("delayed.job.future_count").with_payload(gram_payload).with_value(0) + .and emit_notification("delayed.job.locked_count").with_payload(gram_payload).with_value(0) + .and emit_notification("delayed.job.erroring_count").with_payload(gram_payload).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(gram_payload).with_value(0) + .and emit_notification("delayed.job.working_count").with_payload(gram_payload).with_value(0) + .and emit_notification("delayed.job.workable_count").with_payload(gram_payload).with_value(0) + .and emit_notification("delayed.job.max_age").with_payload(gram_payload).approximately.with_value(0) + .and emit_notification("delayed.job.max_lock_age").with_payload(gram_payload).approximately.with_value(0) + .and emit_notification("delayed.job.alert_age_percent").with_payload(gram_payload).approximately.with_value(0) + end end - let(:banana_payload) { default_payload.merge(queue: 'banana', priority: 'interactive') } - let(:gram_payload) { default_payload.merge(queue: 'gram', priority: 'interactive') } - it 'emits the expected results for each queue' do - expect { subject.run! } - .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) - .and emit_notification("delayed.job.count").with_payload(banana_payload).with_value(1) - .and emit_notification("delayed.job.future_count").with_payload(banana_payload).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(banana_payload).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(banana_payload).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(banana_payload).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(banana_payload).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(banana_payload).with_value(1) - .and emit_notification("delayed.job.max_age").with_payload(banana_payload).with_value(4.hours) - .and emit_notification("delayed.job.max_lock_age").with_payload(banana_payload).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(banana_payload).with_value(4.0.hours / 8.hours * 100) - .and emit_notification("delayed.job.count").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.future_count").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.locked_count").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.erroring_count").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.failed_count").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.working_count").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.workable_count").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.max_age").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.max_lock_age").with_payload(gram_payload).with_value(0) - .and emit_notification("delayed.job.alert_age_percent").with_payload(gram_payload).with_value(0) + context 'when using app-local timezone for DB timestamps' do + let(:app_local_db_time) { true } + + it 'emits the expected results for each metric' do + expect { subject.run! } + .to emit_notification("delayed.monitor.run").with_payload(default_payload.except(:queue)) + .and emit_notification("delayed.job.count").with_payload(p0_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.erroring_count").with_payload(p0_payload).with_value(3) + .and emit_notification("delayed.job.failed_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p0_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p0_payload).approximately.with_value(30.seconds) + .and emit_notification("delayed.job.max_lock_age").with_payload(p0_payload).approximately.with_value(3.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p0_payload).approximately.with_value(30.0.seconds / 1.minute * 100) + .and emit_notification("delayed.job.count").with_payload(p10_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.erroring_count").with_payload(p10_payload).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p10_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p10_payload).approximately.with_value(2.minutes) + .and emit_notification("delayed.job.max_lock_age").with_payload(p10_payload).approximately.with_value(7.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p10_payload).approximately.with_value(2.0.minutes / 3.minutes * 100) + .and emit_notification("delayed.job.count").with_payload(p20_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.erroring_count").with_payload(p20_payload).with_value(3) + .and emit_notification("delayed.job.failed_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p20_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p20_payload).approximately.with_value(1.hour) + .and emit_notification("delayed.job.max_lock_age").with_payload(p20_payload).approximately.with_value(9.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p20_payload).approximately.with_value(1.hour / 1.5.hours * 100) + .and emit_notification("delayed.job.count").with_payload(p30_payload).with_value(4) + .and emit_notification("delayed.job.future_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.locked_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.erroring_count").with_payload(p30_payload).with_value(0) + .and emit_notification("delayed.job.failed_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.working_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.workable_count").with_payload(p30_payload).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p30_payload).approximately.with_value(6.hours) + .and emit_notification("delayed.job.max_lock_age").with_payload(p30_payload).approximately.with_value(11.minutes) + .and emit_notification("delayed.job.alert_age_percent").with_payload(p30_payload).approximately.with_value(100) # 6 hours / 4 hours (overflow) + .and emit_notification("delayed.job.workable_count").with_payload(p30_payload.merge(queue: 'banana')).with_value(1) + .and emit_notification("delayed.job.max_age").with_payload(p30_payload.merge(queue: 'banana')).approximately.with_value(4.hours) + end end end end diff --git a/spec/helper.rb b/spec/helper.rb index 979499c1..066a8557 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -38,13 +38,12 @@ def with_log_level(level) ENV['RAILS_ENV'] = 'test' -db_adapter = ENV["ADAPTER"] -gemfile = ENV["BUNDLE_GEMFILE"] -db_adapter ||= gemfile && gemfile[%r{gemfiles/(.*?)/}] && $1 # rubocop:disable Style/PerlBackrefs -db_adapter ||= "sqlite3" +def current_adapter + ENV.fetch('ADAPTER', 'sqlite3') +end config = YAML.load(ERB.new(File.read("spec/database.yml")).result) -ActiveRecord::Base.establish_connection config[db_adapter] +ActiveRecord::Base.establish_connection config[current_adapter] ActiveRecord::Base.logger = Delayed.logger ActiveJob::Base.logger = Delayed.logger ActiveRecord::Migration.verbose = false @@ -58,7 +57,7 @@ def with_log_level(level) # MySQL 5.7 no longer supports null default values for the primary key # Override the default primary key type in Rails <= 4.0 # https://stackoverflow.com/a/34555109 -if db_adapter == "mysql2" +if current_adapter == "mysql2" types = if defined?(ActiveRecord::ConnectionAdapters::AbstractMysqlAdapter) # ActiveRecord 3.2+ ActiveRecord::ConnectionAdapters::AbstractMysqlAdapter::NATIVE_DATABASE_TYPES @@ -208,6 +207,14 @@ class SingletonClass ActiveSupport::Dependencies.autoload_paths << File.dirname(__FILE__) end +def default_timezone=(zone) + if ActiveRecord::VERSION::MAJOR >= 7 + ActiveRecord.default_timezone = zone + else + ActiveRecord::Base.default_timezone = zone + end +end + RSpec::Matchers.define :emit_notification do |expected_event_name| attr_reader :actual, :expected @@ -217,10 +224,15 @@ def supports_block_expectations? chain :with_payload, :expected_payload chain :with_value, :expected_value + chain(:approximately) { @approximately = true } diffable match do |block| - @expected = { event_name: expected_event_name, payload: expected_payload, value: expected_value } + if @approximately && current_adapter != 'postgresql' + @expected_value = a_value_within([2, @expected_value.abs * 0.05].max).of(@expected_value) + end + + @expected = { event_name: expected_event_name, payload: expected_payload, value: @expected_value } @actuals = [] callback = ->(name, _started, _finished, _unique_id, payload) do @actuals << { event_name: name, payload: payload.except(:value), value: payload[:value] } @@ -249,10 +261,6 @@ def supports_block_expectations? end end -def current_adapter - ENV.fetch('ADAPTER', 'sqlite3') -end - def current_database if current_adapter == 'sqlite3' a_string_ending_with('tmp/database.sqlite')