Skip to content

Commit

Permalink
Merge b0ec721 into 1806dfb
Browse files Browse the repository at this point in the history
  • Loading branch information
hlascelles committed Mar 24, 2018
2 parents 1806dfb + b0ec721 commit 3193105
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 53 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Unreleased

* Switched to use DB time to find "now" so as to match que queries

## 1.0.3 (2018-03-15)

* Enforced a minimum version of `et-orbi` to supply `#to_local_time` methods. Thanks to @jish.
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ integers, and job classes must be migrated from Resque to Que. Cron syntax can b
understood by [fugit](https://github.com/floraison/fugit#fugitcron).

It has one additional feature, `schedule_type: every_event`. This is set on a job that must be run for every
single matching cron time that goes by, even if the system is offline over more than one match. To better process these `every_event` jobs, they are always enqueued with the first
single matching cron time that goes by, even if the system is offline over more than one match.
To better process these `every_event` jobs, they are always enqueued with the first
argument being the time that they were supposed to be processed.

For example:
Expand Down Expand Up @@ -137,3 +138,4 @@ This gem was inspired by the makers of the excellent [Que](https://github.com/ch
## Contributors

* @jish
* @joehorsnell
5 changes: 5 additions & 0 deletions lib/que/scheduler/adapters/orm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class AdapterBase
SCHEDULER_COUNT_SQL =
'SELECT COUNT(*) FROM que_jobs WHERE job_class = ' \
"'#{Que::Scheduler::SchedulerJob.name}'".freeze
NOW_SQL = 'SELECT now()'.freeze

def transaction
transaction_base.transaction do
Expand All @@ -16,6 +17,10 @@ def transaction
def count_schedulers
dml(SCHEDULER_COUNT_SQL).first.values.first.to_i
end

def now
Time.zone.parse(dml(NOW_SQL).first.values.first)
end
end

class ActiveRecordAdapter < AdapterBase
Expand Down
14 changes: 8 additions & 6 deletions lib/que/scheduler/scheduler_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ def run(options = nil)
scheduler_job_args = SchedulerJobArgs.build(options)
logs = ["que-scheduler last ran at #{scheduler_job_args.last_run_time}."]

# It's possible one worker node has severe clock skew, and reports a time earlier than
# the last run. If so, log, and rescheduled with the same last run at.
# It's possible the DB time has been changed manaully to an earlier time than it was
# before. Whether this was a small amount of time (eg clock drift correction), or a major
# change like timezone, the business schedule semantics of this are unknowable, so log and
# rescheduled with the same last run at.
if scheduler_job_args.as_time < scheduler_job_args.last_run_time
handle_clock_skew(scheduler_job_args, logs)
handle_db_clock_change_backwards(scheduler_job_args, logs)
else
# Otherwise, run as normal
handle_normal_call(scheduler_job_args, logs)
Expand Down Expand Up @@ -63,9 +65,9 @@ def enqueue_required_jobs(scheduler_job_args, logs)
result
end

def handle_clock_skew(scheduler_job_args, logs)
logs << 'que-scheduler detected worker with time older than last run. ' \
'Rescheduling without enqueueing jobs.'
def handle_db_clock_change_backwards(scheduler_job_args, logs)
logs << 'que-scheduler detected the DB time is further back than the last run. ' \
'Rescheduling self again without enqueueing jobs to wait for the clock to catch up.'
enqueue_self_again(
scheduler_job_args.last_run_time,
scheduler_job_args.as_time,
Expand Down
7 changes: 4 additions & 3 deletions lib/que/scheduler/scheduler_job_args.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ class SchedulerJobArgs < Hashie::Dash
property :as_time, required: true

def self.build(options)
now = ::Que::Scheduler::Adapters::Orm.instance.now
parsed =
if options.nil?
# First ever run
# First ever run, there is nothing to do but reschedule self.
{
last_run_time: Time.zone.now,
last_run_time: now,
job_dictionary: []
}
else
Expand All @@ -25,7 +26,7 @@ def self.build(options)
job_dictionary: options.fetch(:job_dictionary)
}
end
SchedulerJobArgs.new(parsed.merge(as_time: Time.zone.now))
SchedulerJobArgs.new(parsed.merge(as_time: now))
end
end
end
Expand Down
75 changes: 38 additions & 37 deletions spec/que/scheduler/adapters/orm_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,48 @@

::DB = Sequel.sqlite
::ActiveRecord::Base.establish_connection(adapter: 'sqlite3', database: ':memory:')
QSAO = ::Que::Scheduler::Adapters::Orm

RSpec.describe ::Que::Scheduler::Adapters::Orm do
context 'adapters' do
def perform_adapter_count_checks
adapter.ddl('CREATE TABLE que_jobs (job_class VARCHAR(255));')
adapter.ddl("INSERT INTO que_jobs values ('SomeJob');")
expect(adapter.count_schedulers).to eq(0)
adapter.ddl("INSERT INTO que_jobs values ('Que::Scheduler::SchedulerJob');")
expect(adapter.count_schedulers).to eq(1)
adapter.ddl("INSERT INTO que_jobs values ('Que::Scheduler::SchedulerJob');")
expect(adapter.count_schedulers).to eq(2)
end
RSpec.describe QSAO do
{
QSAO::ActiveRecordAdapter => ::ActiveRecord::Base,
QSAO::SequelAdapter => ::DB
}.each do |adapters, connection|
describe adapters do
let(:adapter) { described_class.new }

def perform_adapter_transaction_check(underlying_orm)
expect(underlying_orm).to receive(:transaction) do |_, &block|
expect(block.call).to eq('test')
end
adapter.transaction do
'test'
describe '#count_schedulers' do
it 'finds the right number of rows' do
adapter.ddl('CREATE TABLE que_jobs (job_class VARCHAR(255));')
adapter.ddl("INSERT INTO que_jobs values ('SomeJob');")
expect(adapter.count_schedulers).to eq(0)
adapter.ddl("INSERT INTO que_jobs values ('Que::Scheduler::SchedulerJob');")
expect(adapter.count_schedulers).to eq(1)
adapter.ddl("INSERT INTO que_jobs values ('Que::Scheduler::SchedulerJob');")
expect(adapter.count_schedulers).to eq(2)
end
end
end

let(:adapter) { described_class.new }

describe ::Que::Scheduler::Adapters::Orm::ActiveRecordAdapter do
it 'executes the correct SQL to count rows' do
perform_adapter_count_checks
end

it 'performs an action in a transaction' do
perform_adapter_transaction_check(::ActiveRecord::Base)
end
end

describe ::Que::Scheduler::Adapters::Orm::SequelAdapter do
it 'executes the correct SQL' do
perform_adapter_count_checks
describe '#transaction' do
it 'starts a transaction correctly' do
expect(connection).to receive(:transaction) do |_, &block|
expect(block.call).to eq('test')
end
adapter.transaction do
'test'
end
end
end

it 'performs an action in a transaction' do
perform_adapter_transaction_check(::DB)
describe '#now' do
it 'returns the value from the DB' do
expect(adapter).to receive(:dml).with('SELECT now()').and_return(
[{ 'now' => '2018-03-24 10:18:29.079874+01' }]
)
# Check the time parsing handles timezones by expecting a different time in a different
# timezone.
expect(adapter.now).to eq(Time.zone.parse('2018-03-24 09:18:29.079874+00'))
end
end
end
end
Expand All @@ -57,14 +58,14 @@ def perform_adapter_transaction_check(underlying_orm)
it 'returns the correct class for ActiveRecord' do
expect(Gem.loaded_specs).to receive(:has_key?).with('activerecord').and_return(true)
orm = described_class.instance
expect(orm.class).to eq(::Que::Scheduler::Adapters::Orm::ActiveRecordAdapter)
expect(orm.class).to eq(QSAO::ActiveRecordAdapter)
end

it 'returns the correct class for Sequel' do
expect(Gem.loaded_specs).to receive(:has_key?).with('activerecord').and_return(false)
expect(Gem.loaded_specs).to receive(:has_key?).with('sequel').and_return(true)
orm = described_class.instance
expect(orm.class).to eq(::Que::Scheduler::Adapters::Orm::SequelAdapter)
expect(orm.class).to eq(QSAO::SequelAdapter)
end

it 'errors for no orm' do
Expand Down
2 changes: 2 additions & 0 deletions spec/que/scheduler/scheduler_job_args_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
RSpec.describe Que::Scheduler::SchedulerJobArgs do
it 'should prepare default args' do
Timecop.freeze do
apply_db_time_now
args = described_class.build(nil)
expect(args.last_run_time).to eq(Time.zone.now)
expect(args.as_time).to eq(Time.zone.now)
Expand All @@ -17,6 +18,7 @@

def attempt_parse(options)
Timecop.freeze do
apply_db_time_now
args = described_class.build(options)
expect(args.last_run_time.iso8601).to eq(last_time.iso8601)
expect(args.as_time).to eq(Time.zone.now)
Expand Down
16 changes: 10 additions & 6 deletions spec/que/scheduler/scheduler_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
allow(QS::Adapters::Orm.instance).to receive(:transaction) do |_, &block|
block.call
end
Que.adapter.jobs.clear
end

let(:job) { QSSJ.new({}) }
Expand All @@ -29,6 +28,10 @@
end
end

before(:each) do
apply_db_time_now
end

it 'enqueues nothing having loaded the dictionary on the first run' do
run_test(nil, {}, [])
end
Expand Down Expand Up @@ -138,14 +141,15 @@ def expect_scheduled(list, new_dictionary)
expect(all_enqueued).to eq(list)
end

context 'clock skew' do
# The scheduler job must notice when it is being run on a box that is reporting a time earlier
# than the last time it ran. It should do nothing except reschedule itself.
it 'handles clock skew' do
context 'clock change backwards' do
# The scheduler job must notice when the db is reporting a time further back
# than the last time it ran. The job should do nothing except reschedule itself.
it 'handled by rescheduling self' do
last_run = Time.zone.parse('2017-11-08T13:50:32')

Timecop.freeze(last_run - 1.hour) do
expect(job).to receive(:handle_clock_skew).and_call_original
apply_db_time_now
expect(job).to receive(:handle_db_clock_change_backwards).and_call_original
job.run(last_run_time: last_run.iso8601, job_dictionary: %w[SomeJob])
expect_itself_enqueued(last_run, Time.zone.now, %w[SomeJob])
end
Expand Down
3 changes: 3 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@
config.mock_with :rspec do |mocks|
mocks.verify_partial_doubles = true
end
config.before(:each) do
Que.adapter.jobs.clear
end
end
3 changes: 3 additions & 0 deletions spec/support/time_support.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def apply_db_time_now
allow(QS::Adapters::Orm.instance).to receive(:now).and_return(Time.zone.now)
end

0 comments on commit 3193105

Please sign in to comment.