Skip to content
This repository has been archived by the owner on May 26, 2021. It is now read-only.

Commit

Permalink
Merge branch 'strings_with_hooks' of https://github.com/infbio/resque…
Browse files Browse the repository at this point in the history
…-scheduler into infbio-strings_with_hooks
  • Loading branch information
bvandenbos committed Apr 18, 2011
2 parents 4b66d06 + 5029aa2 commit 5310ebe
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 90 deletions.
34 changes: 13 additions & 21 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,52 +1,44 @@
PATH
remote: .
specs:
resque-scheduler (1.9.7)
resque-scheduler (2.0.0.c)
redis (>= 2.0.1)
resque (>= 1.8.0)
resque (>= 1.15.0)
rufus-scheduler

GEM
remote: http://rubygems.org/
specs:
git (1.2.5)
jeweler (1.5.1)
bundler (~> 1.0.0)
git (>= 1.2.5)
rake
json (1.4.6)
mocha (0.9.9)
rake
rack (1.2.1)
rack-test (0.5.6)
rack (>= 1.0)
rake (0.8.7)
redis (2.1.1)
redis-namespace (0.8.0)
redis (2.2.0)
redis-namespace (0.10.0)
redis (< 3.0.0)
resque (1.10.0)
resque (1.15.0)
json (~> 1.4.6)
redis-namespace (~> 0.8.0)
redis-namespace (>= 0.10.0)
sinatra (>= 0.9.2)
vegas (~> 0.1.2)
rufus-scheduler (2.0.7)
tzinfo
sinatra (1.1.0)
rufus-scheduler (2.0.8)
tzinfo (>= 0.3.23)
sinatra (1.2.1)
rack (~> 1.1)
tilt (~> 1.1)
tilt (1.1)
tzinfo (0.3.23)
tilt (< 2.0, >= 1.2.2)
tilt (1.2.2)
tzinfo (0.3.25)
vegas (0.1.8)
rack (>= 1.0.0)

PLATFORMS
ruby

DEPENDENCIES
jeweler
bundler (>= 1.0.0)
mocha
rack-test
redis (>= 2.0.1)
resque (>= 1.8.0)
resque-scheduler!
rufus-scheduler
42 changes: 26 additions & 16 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ class << self

# If true, logs more stuff...
attr_accessor :verbose

# If set, produces no output
attr_accessor :mute

# If set, will try to update the schulde in the loop
attr_accessor :dynamic

# the Rufus::Scheduler jobs that are scheduled
def scheduled_jobs
@@scheduled_jobs
Expand Down Expand Up @@ -53,7 +53,7 @@ def run
def register_signal_handlers
trap("TERM") { shutdown }
trap("INT") { shutdown }

begin
trap('QUIT') { shutdown }
trap('USR1') { kill_child }
Expand All @@ -67,19 +67,19 @@ def register_signal_handlers
# rufus scheduler instance
def load_schedule!
# Need to load the schedule from redis for the first time if dynamic
Resque.reload_schedule! if dynamic
Resque.reload_schedule! if dynamic

log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?

@@scheduled_jobs = {}

Resque.schedule.each do |name, config|
load_schedule_job(name, config)
end
Resque.redis.del(:schedules_changed)
procline "Schedules Loaded"
end

# Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
def load_schedule_job(name, config)
# If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
Expand Down Expand Up @@ -128,7 +128,7 @@ def handle_delayed_items(at_time=nil)
end
end
end

# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
Expand All @@ -152,9 +152,12 @@ def handle_shutdown
# Enqueues a job based on a config hash
def enqueue_from_config(job_config)
args = job_config['args'] || job_config[:args]

klass_name = job_config['class'] || job_config[:class]
klass = constantize(klass_name) rescue klass_name

params = args.is_a?(Hash) ? [args] : Array(args)
queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(constantize(klass_name))
queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass)
# Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status)
if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job')
# The custom job class API must offer a static "scheduled" method. If the custom
Expand All @@ -167,7 +170,14 @@ def enqueue_from_config(job_config)
Resque::Job.create(queue, job_klass, *params)
end
else
Resque::Job.create(queue, klass_name, *params)
# hack to avoid havoc for people shoving stuff into queues
# for non-existent classes (for example: running scheduler in
# one app that schedules for another
if Class === klass
Resque.enqueue(klass, *params)
else
Resque::Job.create(queue, klass, *params)
end
end
rescue
log! "Failed to enqueue #{klass_name}:\n #{$!}"
Expand All @@ -185,13 +195,13 @@ def clear_schedule!
@@scheduled_jobs = {}
rufus_scheduler
end

def reload_schedule!
procline "Reloading Schedule"
clear_schedule!
load_schedule!
end

def update_schedule
if Resque.redis.scard(:schedules_changed) > 0
procline "Updating schedule"
Expand All @@ -207,7 +217,7 @@ def update_schedule
procline "Schedules Loaded"
end
end

def unschedule_job(name)
if scheduled_jobs[name]
log "Removing schedule #{name}"
Expand Down Expand Up @@ -238,7 +248,7 @@ def log(msg)
# add "verbose" logic later
log!(msg) if verbose
end

def procline(string)
log! string
$0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}"
Expand Down
7 changes: 3 additions & 4 deletions resque-scheduler.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ Gem::Specification.new do |s|
s.description = %q{Light weight job scheduling on top of Resque.
Adds methods enqueue_at/enqueue_in to schedule jobs in the future.
Also supports queueing jobs on a fixed, cron-like schedule.}

s.required_rubygems_version = ">= 1.3.6"
s.add_development_dependency "bundler", ">= 1.0.0"

s.files = `git ls-files`.split("\n")
s.executables = `git ls-files`.split("\n").map{|f| f =~ /^bin\/(.*)/ ? $1 : nil}.compact
s.require_path = 'lib'

s.add_runtime_dependency(%q<redis>, [">= 2.0.1"])
s.add_runtime_dependency(%q<resque>, [">= 1.8.0"])
s.add_runtime_dependency(%q<resque>, [">= 1.15.0"])
s.add_runtime_dependency(%q<rufus-scheduler>, [">= 0"])
s.add_development_dependency(%q<mocha>, [">= 0"])
s.add_development_dependency(%q<rack-test>, [">= 0"])

end
21 changes: 9 additions & 12 deletions test/delayed_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_enqueue_at_adds_correct_list_and_zset
# Confirm the item came out correctly
assert_equal('SomeIvarJob', item['class'], "Should be the same class that we queued")
assert_equal(["path"], item['args'], "Should have the same arguments that we queued")

# And now confirm the keys are gone
assert(!Resque.redis.exists("delayed:#{timestamp.to_i}"))
assert_equal(0, Resque.redis.zcard(:delayed_queue_schedule), "delayed queue should be empty")
Expand Down Expand Up @@ -126,8 +126,7 @@ def test_handle_delayed_items_with_items
Resque.enqueue_at(t, SomeIvarJob)

# 2 SomeIvarJob jobs should be created in the "ivar" queue
Resque::Job.expects(:create).twice.with('ivar', 'SomeIvarJob', nil)
Resque.expects(:queue_from_class).never # Should NOT need to load the class
Resque::Job.expects(:create).twice.with(:ivar, SomeIvarJob, nil)
Resque::Scheduler.handle_delayed_items
end

Expand All @@ -137,23 +136,21 @@ def test_handle_delayed_items_with_items_in_the_future
Resque.enqueue_at(t, SomeIvarJob)

# 2 SomeIvarJob jobs should be created in the "ivar" queue
Resque::Job.expects(:create).twice.with('ivar', 'SomeIvarJob', nil)
Resque.expects(:queue_from_class).never # Should NOT need to load the class
Resque::Job.expects(:create).twice.with(:ivar, SomeIvarJob, nil)
Resque::Scheduler.handle_delayed_items(t)
end

def test_enqueue_delayed_items_for_timestamp
t = Time.now + 60

Resque.enqueue_at(t, SomeIvarJob)
Resque.enqueue_at(t, SomeIvarJob)

# 2 SomeIvarJob jobs should be created in the "ivar" queue
Resque::Job.expects(:create).twice.with('ivar', 'SomeIvarJob', nil)
Resque.expects(:queue_from_class).never # Should NOT need to load the class
Resque::Job.expects(:create).twice.with(:ivar, SomeIvarJob, nil)

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)

# delayed queue for timestamp should be empty
assert_equal(0, Resque.delayed_timestamp_peek(t, 0, 3).length)
end
Expand All @@ -165,7 +162,7 @@ def test_works_with_out_specifying_queue__upgrade_case
# Since we didn't specify :queue when calling delayed_push, it will be forced
# to load the class to figure out the queue. This is the upgrade case from 1.0.4
# to 1.0.5.
Resque::Job.expects(:create).once.with(:ivar, 'SomeIvarJob', nil)
Resque::Job.expects(:create).once.with(:ivar, SomeIvarJob, nil)

Resque::Scheduler.handle_delayed_items
end
Expand Down Expand Up @@ -206,7 +203,7 @@ def test_remove_specific_item_in_group_of_other_items_at_same_timestamp
assert_equal(2, Resque.remove_delayed(SomeIvarJob, "bar"))
assert_equal(1, Resque.delayed_queue_schedule_size)
end

def test_remove_specific_item_in_group_of_other_items_at_different_timestamps
t = Time.now + 120
Resque.enqueue_at(t, SomeIvarJob, "foo")
Expand Down
Loading

0 comments on commit 5310ebe

Please sign in to comment.