Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix the way we fetch jobs to work better for a large number of workers.

Instead of fetching 5 candidates at attempting to lock each one by one, just lock the next job in 1 query.

Example of old system and 100 workers (worst but not uncommon case):
1) 100 workers wake up and fetch 5 candidate jobs (they all get the same, or very similar set of 5 jobs), making a total of 100 SELECT calls
2) they all try to lock the first job (only 1 of 99 succeeds). The failing workers try the next, making a total of roughly 500 UPDATE calls
3) a total of 5 jobs get processed from about 600 database calls, and we restart the whole process

New system:
1) 100 workers wake up and fetch the next available job, each getting a single unique job (1 SELECT and 1 UPDATE call each).
2) a total of 100 jobs are processed with exactly 200 SQL calls

I've also included an example to make it all work in 1 call, avoiding an extra round-trip. This requires custom SQL only tested with PostgreSQL
  • Loading branch information...
commit 940bf92f0a90f07661f8683996a425dd09572222 1 parent f276ec2
@scosman scosman authored
Showing with 25 additions and 10 deletions.
  1. +25 −10 lib/delayed/backend/active_record.rb
View
35 lib/delayed/backend/active_record.rb
@@ -54,16 +54,31 @@ def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
- # Find a few candidate jobs to run (in case some immediately get locked by others).
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
- scope = self.ready_to_run(worker_name, max_run_time)
- scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
- scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
- scope = scope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?
-
- ::ActiveRecord::Base.silence do
- scope.by_priority.all(:limit => limit)
- end

Is there any reason why this patch dropped the silencing of Active Record? With 0.4.0 forward all pooling queries are being logged, making the log file grow a lot when log_level is set to debug (ie development or other envs used for validating stuff). /cc @sferik

Hey @albus522 @sferik, do you have any input on this? :)

@sferik Owner
sferik added a note

Have you asked @scosman why he made this change? I wouldn’t be opposed to adding it back if it wasn’t removed for a good reason.

@sferik I didn't ask @scosman directly besides that first comment, but I imagined he'd get a notification and could answer back with some reasoning. In any case, hearing from him would definitely be good, thanks.

@scosman
scosman added a note

I can't recall at this point. Generally I prefer to not be hide db requests from the logs. It's annoying since it's polling, but these calls were causing major perf issues for us and were harder to track down because they were not in the logs.

@sferik your call.

@sferik Owner
sferik added a note

@scosman That logic seems reasonable to me.

@carlosantoniodasilva Any reason why you can’t just silence the logs in your app? This ought to do the trick:

module Delayed
  module Backend
    module ActiveRecord
      class Job < ::ActiveRecord::Base
        ::ActiveRecord::Base.silence do
          scope.by_priority.all(:limit => limit)
        end
      end
    end
  end
end

I see the reasoning (in fact that was my thought while reading the commit), however I'm unsure that the application logs are the best place to track such perf issues down. Anyway the problem I faced is that the AR logs are growing a lot due to the frequency DJ queries the database, and it was a bit hard to figure out why at first.

@sferik I could probably hack something out definitely, although I'm not sure the code you pasted is going to work :). Are you saying to override inside the Job class I presume?

The code has changed considerably since this commit, so I believe it'd probably not be so future proof, but I can give it a shot.

Thanks anyway.

@sferik Owner
sferik added a note

@sferik I could probably hack something out definitely, although I'm not sure the code you pasted is going to work :). Are you saying to override inside the Job class I presume?

Yeah, that’s what I meant. Updated the snipped above so it should work.

Alright, thanks.

For the record, this seems to do the trick (although it is not the most beautiful thing in the world):

# Silence DJ querying AR frequently.
Delayed::Backend::ActiveRecord::Job.instance_eval do
  def reserve_with_silence(*args)
    silence { reserve_without_silence(*args) }
  end

  class << self
    alias_method_chain :reserve, :silence
  end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ def self.reserve(worker, max_run_time = Worker.max_run_time)
+ # scope to filter to records that are "ready to run"
+ readyScope = self.ready_to_run(worker.name, max_run_time)
+
+ # scope to filter to the single next eligible job (locking it for update http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE)
+ nextScope = readyScope.scoped
+ nextScope = nextScope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
+ nextScope = nextScope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
+ nextScope = nextScope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?
+ nextScope = nextScope.scoped.by_priority.limit(1).lock(true)
+ nextScope = nextScope.scoped.select('id')
+
+ now = self.db_time_now
+
+ # This works on any database and uses seperate queries to lock and return the job
+ # Databases like PostgreSQL and MySQL that support "SELECT .. FOR UPDATE" (ActiveRecord Pessimistic locking) don't need the second application
+ # of 'readyScope' but it doesn't hurt and it ensures that the job being locked still meets ready_to_run criteria.
+ count = readyScope.where(:id => nextScope).update_all(:locked_at => now, :locked_by => worker.name)
+ return nil if count == 0
+ return self.where(:locked_at => now, :locked_by => worker.name).first
+
+ # This works on PostgreSQL and uses 1 less query, but uses SQL not supported nativly through ActiveRecord
+ #quotedTableName = ::ActiveRecord::Base.connection.quote_column_name(self.table_name)
+ #reserved = self.find_by_sql(["UPDATE #{quotedTableName} SET locked_at = ?, locked_by = ? WHERE id IN (#{nextScope.to_sql}) RETURNING *",now,worker.name])
+ #return reserved[0]
end
# Lock this job for this worker.
Please sign in to comment.
Something went wrong with that request. Please try again.