Permalink
Browse files

workers now only scale every n minutes

workers wait in after perform for either time to scale come up or for new pending tasks
  • Loading branch information...
1 parent 98d1c3f commit 0feba85d8f44fb9157ab4ecc0f7de6d25e9f1b81 @ajmurmann ajmurmann committed May 22, 2011
View
@@ -7,5 +7,6 @@ group :development do
gem "rspec", "~> 2.0"
gem 'rr', '1.0.2'
gem 'rake'
+ gem 'timecop', '0.3.5'
end
@@ -28,6 +28,11 @@ def heroku_app
@heroku_app || ENV['HEROKU_APP']
end
+ attr_writer :wait_time
+ def wait_time
+ @wait_time || 60
+ end
+
def new_worker_count(pending=nil, *payload, &calculate_count)
if calculate_count
@new_worker_count = calculate_count
@@ -7,9 +7,10 @@ module HerokuAutoscaler
def after_enqueue_scale_workers_up(*args)
if !Resque::Plugins::HerokuAutoscaler::Config.scaling_disabled? && \
- Resque.info[:workers] == 0 && \
- Resque::Plugins::HerokuAutoscaler::Config.new_worker_count(Resque.info[:pending]) >= 1
+ Resque.info[:workers] == 0 && \
+ Resque::Plugins::HerokuAutoscaler::Config.new_worker_count(Resque.info[:pending]) >= 1
set_workers(1)
+ Resque.redis.set('last_scaled', Time.now)
end
end
@@ -40,15 +41,33 @@ def self.config
yield Resque::Plugins::HerokuAutoscaler::Config
end
- private
-
def calculate_and_set_workers
unless Resque::Plugins::HerokuAutoscaler::Config.scaling_disabled?
- new_count = Resque::Plugins::HerokuAutoscaler::Config.new_worker_count(Resque.info[:pending])
- set_workers(new_count) if new_count == 0 || new_count > current_workers
+ wait_for_task_or_scale
+ if time_to_scale?
+ scale
+ end
end
end
+ private
+
+ def scale
+ new_count = Resque::Plugins::HerokuAutoscaler::Config.new_worker_count(Resque.info[:pending])
+ set_workers(new_count) if new_count == 0 || new_count > current_workers
+ Resque.redis.set('last_scaled', Time.now)
+ end
+
+ def wait_for_task_or_scale
+ until Resque.info[:pending] > 0 || time_to_scale?
+ Kernel.sleep(0.5)
+ end
+ end
+
+ def time_to_scale?
+ (Time.now - Time.parse(Resque.redis.get('last_scaled'))) >= Resque::Plugins::HerokuAutoscaler::Config.wait_time
+ end
+
def log(message)
if defined?(Rails)
Rails.logger.info(message)
View
@@ -53,6 +53,16 @@
end
end
+ describe ".wait_time" do
+
+ it{ Resque::Plugins::HerokuAutoscaler::Config.wait_time.should == 60}
+
+ it "can be set" do
+ subject.wait_time = 30
+ subject.wait_time.should == 30
+ end
+ end
+
describe ".new_worker_count" do
before do
@original_method = Resque::Plugins::HerokuAutoscaler::Config.instance_variable_get(:@new_worker_count)
@@ -2,6 +2,7 @@
require 'heroku'
require 'resque'
require 'resque/plugins/resque_heroku_autoscaler'
+require 'timecop'
class TestJob
extend Resque::Plugins::HerokuAutoscaler
@@ -62,6 +63,17 @@ class AnotherJob
Resque::Plugins::HerokuAutoscaler::Config.instance_variable_set(:@new_worker_count, @original_method)
end
+ it "should set last_scaled" do
+ now = Time.now
+ Timecop.freeze(now)
+ stub(TestJob).set_workers(1)
+
+ TestJob.after_enqueue_scale_workers_up
+ Resque.redis.get('last_scaled').should == now.to_s
+
+ Timecop.return
+ end
+
context "when scaling workers is disabled" do
before do
subject.config do |c|
@@ -78,9 +90,9 @@ class AnotherJob
describe ".after_perform_scale_workers" do
before do
+ Resque.redis.set('last_scaled', Time.now - 120)
stub(TestJob).heroku_client { @fake_heroku_client }
end
-
it "should add the hook" do
Resque::Plugin.after_hooks(TestJob).should include("after_perform_scale_workers")
end
@@ -91,75 +103,11 @@ class AnotherJob
lambda { TestJob.after_perform_scale_workers("some", "random", "aguments", 42) }.should_not raise_error
end
-
- context "when the queue is empty" do
- before do
- stub(Resque).info { {:pending => 0} }
- end
-
- it "should set workers to 0" do
- mock(TestJob).set_workers(0)
- TestJob.after_perform_scale_workers
- end
- end
-
- context "when the queue is not empty" do
- before do
- stub(Resque).info { {:pending => 1} }
- end
-
- it "should keep workers at 1" do
- mock(TestJob).set_workers(1)
- TestJob.after_perform_scale_workers
- end
- end
-
- context "when new_worker_count was changed" do
- before do
- @original_method = Resque::Plugins::HerokuAutoscaler::Config.instance_variable_get(:@new_worker_count)
- subject.config do |c|
- c.new_worker_count do
- 2
- end
- end
- end
-
- after do
- Resque::Plugins::HerokuAutoscaler::Config.instance_variable_set(:@new_worker_count, @original_method)
- end
-
- it "should use the given block" do
- mock(TestJob).set_workers(2)
- TestJob.after_perform_scale_workers
- end
- end
-
- context "when scaling workers is disabled" do
- before do
- subject.config do |c|
- c.scaling_disabled = true
- end
- end
-
- it "should not use the heroku client" do
- dont_allow(TestJob).heroku_client
- TestJob.after_perform_scale_workers
- end
- end
-
- describe "when the new worker count would should down some workers" do
- before do
- stub(TestJob).current_workers { 2 }
- end
- it "should not scale down workers since we don't want to accidentally shut down busy workers" do
- dont_allow(TestJob).set_workers
- TestJob.after_perform_scale_workers
- end
- end
end
describe ".on_failure_scale_workers" do
before do
+ Resque.redis.set('last_scaled', Time.now - 120)
stub(TestJob).heroku_client { @fake_heroku_client }
end
@@ -173,15 +121,33 @@ class AnotherJob
lambda { TestJob.on_failure_scale_workers("some", "random", "aguments", 42) }.should_not raise_error
end
+ end
+ describe ".calculate_and_set_workers" do
+ before do
+ Resque.redis.set('last_scaled', Time.now - 120)
+ stub(TestJob).heroku_client { @fake_heroku_client }
+ end
+
context "when the queue is empty" do
before do
+ @now = Time.now
+ Timecop.freeze(@now)
stub(Resque).info { {:pending => 0} }
end
+ after { Timecop.return }
+
it "should set workers to 0" do
mock(TestJob).set_workers(0)
- TestJob.on_failure_scale_workers
+ TestJob.calculate_and_set_workers
+ end
+
+ it "sets last scaled time" do
+ stub(TestJob).set_workers(0)
+
+ TestJob.calculate_and_set_workers
+ Resque.redis.get('last_scaled').should == @now.to_s
end
end
@@ -192,7 +158,20 @@ class AnotherJob
it "should keep workers at 1" do
mock(TestJob).set_workers(1)
- TestJob.on_failure_scale_workers
+ TestJob.calculate_and_set_workers
+ end
+
+ context "when scaling workers is disabled" do
+ before do
+ subject.config do |c|
+ c.scaling_disabled = true
+ end
+ end
+
+ it "should not use the heroku client" do
+ dont_allow(TestJob).heroku_client
+ TestJob.calculate_and_set_workers
+ end
end
end
@@ -212,30 +191,43 @@ class AnotherJob
it "should use the given block" do
mock(TestJob).set_workers(2)
- TestJob.on_failure_scale_workers
+ TestJob.calculate_and_set_workers
end
end
- context "when scaling workers is disabled" do
+ context "when the new worker count might shut down busy workers" do
before do
- subject.config do |c|
- c.scaling_disabled = true
- end
+ stub(TestJob).current_workers { 2 }
end
-
- it "should not use the heroku client" do
- dont_allow(TestJob).heroku_client
- TestJob.on_failure_scale_workers
+ it "should not scale down workers since we don't want to accidentally shut down busy workers" do
+ dont_allow(TestJob).set_workers
+ TestJob.calculate_and_set_workers
end
end
- describe "when the new worker count would should down some workers" do
+ describe "when we changed the worker count in less than minimum wait time" do
before do
- stub(TestJob).current_workers { 2 }
+ subject.config { |c| c.wait_time = 5}
+ @last_set = Time.parse("00:00:00")
+ Resque.redis.set('last_scaled', @last_set)
end
- it "should not scale down workers since we don't want to accidentally shut down busy workers" do
+
+ after { Timecop.return }
+
+ it "should not adjust the worker count" do
+ Timecop.freeze(@last_set + 4)
dont_allow(TestJob).set_workers
- TestJob.after_perform_scale_workers
+ TestJob.calculate_and_set_workers
+ end
+
+ context "when there are no jobs left" do
+ it "keeps checking for jobs and scales down if no job appeard" do
+ Timecop.freeze(@last_set)
+ mock(Resque).info.times(12) { {:pending => 0} }
+ mock(Kernel).sleep(0.5).times(10) { Timecop.freeze(@last_set += 0.5) }
+ mock(TestJob).set_workers(0)
+ TestJob.calculate_and_set_workers
+ end
end
end
end

0 comments on commit 0feba85

Please sign in to comment.