Skip to content

Commit

Permalink
Added competitive locks and tests for main lock functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu committed Jul 19, 2011
1 parent 3c2b7c4 commit a3ee17b
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 43 deletions.
93 changes: 50 additions & 43 deletions lib/resque/lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,27 @@
module Resque
module Lock

# normal lock
def lock_key( options = {} )
klass = options.delete("klass") || self.name
"lock:#{klass}-#{options.keys.map(&:to_s).sort.join("|")}-#{options.values.map(&:to_s).sort.join("|")}"
"lock:#{klass}-#{options.to_a.sort_by(&:first).map{|a| a.join("=") }.join("|").to_s}"
end

def competitive_lock_key( options = {} )
klass = options.delete("klass") || self.name
"competitive-lock:#{klass}-#{options.to_a.sort_by(&:first).map{|a| a.join("=") }.join("|").to_s}"
end

def locked?( options )
Resque.redis.exists( lock_key( options ) )
end

def lock_uuid( options )
Resque.redis.get( lock_key( options ) )
end

# competitive lock

def _extra_locks_list_options options = {}
if self.respond_to? :extra_locks_list_options
self.send :extra_locks_list_options, options
Expand All @@ -26,68 +42,59 @@ def _extra_locks_jobs_list_options options = {}
end
end

def locked?( options )
Resque.redis.exists( lock_key( options ) )
def locked_by_competitor? options
Resque.redis.exists( competitive_lock_key( options ) )
end

def lock_uuid( options )
Resque.redis.get( lock_key( options ) )
end

def unlock_uuid( options )
Resque.redis.del( lock_key( options ) )
end

def lock_different_jobs uuid, options
_extra_locks_jobs_list_options( options ).each do | extra_lock_jobs_opts |
Resque.redis.set( lock_key( extra_lock_jobs_opts ), uuid )

def with_competitive_lock uuid, options
_extra_locks_list_options( options ).each do | extra_lock_opts |
Resque.redis.set( competitive_lock_key( extra_lock_opts ), uuid )
end
end

def unlock_different_jobs options
_extra_locks_jobs_list_options( options ).each do | extra_lock_jobs_opts |
unlock_uuid( extra_lock_jobs_opts )
Resque.redis.set( competitive_lock_key( extra_lock_jobs_opts ), uuid )
end
end

def lock_same_jobs uuid, options
_extra_locks_list_options( options ).each do | extra_lock_opts |
Resque.redis.set( lock_key( extra_lock_opts ), uuid )
begin
yield
ensure
_extra_locks_jobs_list_options( options ).each do | extra_lock_jobs_opts |
competitive_unlock( extra_lock_jobs_opts )
end
_extra_locks_list_options( options ).each do | extra_lock_opts |
competitive_unlock( extra_lock_opts )
end
end
end

def unlock_same_jobs options
_extra_locks_list_options( options ).each do | extra_lock_opts |
unlock_uuid( extra_lock_opts )
end
end

# Where the magic happens.
def enqueue(klass, options = {})
# Abort if another job added.
uuid = lock_uuid( options )
if uuid.blank?
if uuid.nil? || uuid.empty?
uuid = super(klass, options)
Resque.redis.set( lock_key( options ), uuid )
lock_same_jobs uuid, options
lock_different_jobs uuid, options
end
uuid
end

def around_perform_lock *args
begin
yield
ensure
begin
unlock_same_jobs( args[1] )
unlock_different_jobs( args[1] )
unlock_uuid( args[1] )
rescue => e
puts e.to_s
puts e.backtrace.join("\n")
unless locked_by_competitor? args[1]
uuid = lock_uuid( args[1] )
with_competitive_lock uuid, args[1] do
yield
end
unlock( args[1] )
else
unlock( args[1] )
Resque.enqueue(self, *args)
end
end
protected
def competitive_unlock( options )
Resque.redis.del( competitive_lock_key( options ) )
end
def unlock( options )
Resque.redis.del( lock_key( options ) )
end

end
end
36 changes: 36 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

require 'resque/status'
require 'resque/job_with_status'
require 'resque/lock'

class Test::Unit::TestCase
end
Expand Down Expand Up @@ -80,3 +81,38 @@ def perform
end

end

class LockedJob < Resque::JobWithStatus

def perform
total = options['num']
(1..total).each do |num|
at(num, total, "At #{num}")
end
end

end

class LockedKillableJob < Resque::JobWithStatus

def perform
Resque.redis.set("#{uuid}:iterations", 0)
100.times do |num|
Resque.redis.incr("#{uuid}:iterations")
at(num, 100, "At #{num} of 100")
end
end

end

class LockedWithErrorJob < Resque::JobWithStatus

def perform
Resque.redis.set("#{uuid}:iterations", 0)
100.times do |num|
Resque.redis.incr("#{uuid}:iterations")
at(num, 100, "At #{num} of 100")
end
end

end
61 changes: 61 additions & 0 deletions test/test_resque-lock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'test_helper'

class TestLockedJob < Resque::JobWithStatus
extend Resque::Lock
@queue = :test

def self.perform
end
end

class TestExtraLockedJob < Resque::JobWithStatus

extend Resque::Lock
@queue = :test

def self.perform
end
end


class TestResqueLock < Test::Unit::TestCase

context "Resque::Lock" do
setup do
Resque.redis.flushall
end
context "on enqueue with extra lock" do
setup do
@options = { :arrg1 => 1 }
@uuid = TestExtraLockedJob.create @options
@status = Resque::Status.get( @uuid )

end
end
context "on enqueue" do
setup do
@options = { :arrg1 => 1 }
@uuid = TestLockedJob.create @options
@status = Resque::Status.get( @uuid )
end
should "create a lock with appropriated status uuid" do
assert TestLockedJob.locked?( @options )
assert !TestExtraLockedJob.locked?( @options )
assert @status["status"] == "queued"
end
context "and enqueue" do
setup do
@uuid2 = TestLockedJob.create @options
@status2 = Resque::Status.get( @uuid2 )
end
should "be locked with the same job id" do
assert TestLockedJob.locked?( @options )
assert !TestExtraLockedJob.locked?( @options )
assert @status2["status"] == "queued"
assert @status == @status2
assert @uuid == @uuid2
end
end
end
end
end

0 comments on commit a3ee17b

Please sign in to comment.