Permalink
Browse files

more refactors and integration tests

  • Loading branch information...
1 parent 832919e commit d433117fb582f31a62f6bed69fe83339d090d109 @freels committed Oct 6, 2010
Showing with 152 additions and 60 deletions.
  1. +29 −22 lib/kestrel/client/transactional.rb
  2. +123 −38 spec/kestrel/client/transactional_spec.rb
View
51 lib/kestrel/client/transactional.rb
@@ -42,59 +42,66 @@ def initialize(client, max_retries = nil, error_rate = nil)
def get(key, opts = {})
raise MultipleQueueException if current_queue && key != current_queue
- close_transaction(queue_for_last_job) if @job
+ close_last_transaction
- queue = (rand < @error_rate) ? key + "_errors" : key
-
- if job = client.get(queue, opts.merge(:open => true))
- @current_queue = key
+ if job = client.get(normal_or_error_queue(key), opts.merge(:open => true))
@job = job.is_a?(RetryableJob) ? job : RetryableJob.new(0, job)
+ @current_is_retry = job.is_a? RetryableJob
+ @current_queue = key
@job.job
- else
- @current_queue = @job = nil
end
end
- def queue_for_last_job
- if @job.retries < 1
- @current_queue
- else
- current_queue + "_errors"
- end
+ def current_try
+ @job.retries + 1
end
# Enqueues the current job on the error queue for later
# retry. If the job has been retried DEFAULT_RETRIES times,
# gives up entirely.
#
# ==== Returns
- # Boolean:: true if the job is retryable, false otherwise
+ # Boolean:: true if the job is enqueued in the retry queue, false otherwise
+ #
#
def retry(item = nil)
+ enqueued_retry = false
+
job =
if item
current_retries = (@job ? @job.retries : 0)
RetryableJob.new(current_retries, item)
else
- @job
+ @job.dup
end
return unless job
- queue = queue_for_last_job
job.retries += 1
client.set(current_queue + "_errors", job) if job.retries < @max_retries
- close_transaction(queue)
-
- # No longer have an active job
- @current_queue = @job = nil
+ close_last_transaction
+
job.retries < @max_retries
end
+ def close_last_transaction #:nodoc:
+ return unless @job
+
+ queue_for_last_job =
+ if @current_is_retry
+ current_queue + "_errors"
+ else
+ current_queue
+ end
+
+ client.get_from_last("#{queue_for_last_job}/close")
+ @current_is_retry = @current_queue = @job = nil
+ end
+
private
- def close_transaction(key) #:nodoc:
- client.get_from_last("#{key}/close")
+ def normal_or_error_queue(key)
+ (rand < @error_rate) ? key + "_errors" : key
end
end
View
161 spec/kestrel/client/transactional_spec.rb
@@ -1,35 +1,101 @@
require 'spec/spec_helper'
describe "Kestrel::Client::Transactional" do
+ before do
+ @raw_kestrel_client = Kestrel::Client.new(*Kestrel::Config.default)
+ @kestrel = Kestrel::Client::Transactional.new(@raw_kestrel_client)
+ @queue = "some_queue"
+ end
+
+ describe "integration" do
+ def get_job
+ job = nil
+ job = @kestrel.get(@queue) until job
+ job
+ end
+
+ it "processes normal jobs" do
+ returns = [:mcguffin]
+ stub(@raw_kestrel_client).get(@queue, anything) { returns.shift }
+ stub(@raw_kestrel_client).get(@queue + "_errors", anything)
+ mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+
+ get_job.should == :mcguffin
+ @kestrel.current_try.should == 1
+ @kestrel.get(@queue) # simulate next get run
+ end
+
+ it "processes successful retries" do
+ returns = [Kestrel::Client::Transactional::RetryableJob.new(1, :mcguffin)]
+ stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get(@queue, anything)
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+
+ get_job.should == :mcguffin
+ @kestrel.current_try.should == 2
+ @kestrel.get(@queue) # simulate next get run
+ end
+
+ it "processes normal jobs that should retry" do
+ returns = [:mcguffin]
+ stub(@raw_kestrel_client).get(@queue, anything) { returns.shift }
+ stub(@raw_kestrel_client).get(@queue + "_errors", anything)
+ mock(@raw_kestrel_client).set(@queue + "_errors", anything) do |q,j|
+ j.retries.should == 1
+ j.job.should == :mcguffin
+ end
+ mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+
+ get_job.should == :mcguffin
+ @kestrel.current_try.should == 1
+
+ @kestrel.retry
+ @kestrel.get(@queue) # simulate next get run
+ end
+
+ it "processes retries that should retry" do
+ returns = [Kestrel::Client::Transactional::RetryableJob.new(1, :mcguffin)]
+ stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get(@queue, anything)
+ mock(@raw_kestrel_client).set(@queue + "_errors", anything) do |q,j|
+ j.retries.should == 2
+ j.job.should == :mcguffin
+ end
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+
+ get_job.should == :mcguffin
+ @kestrel.current_try.should == 2
+
+ @kestrel.retry
+ @kestrel.get(@queue) # simulate next get run
+ end
+
+ it "processes retries that should give up" do
+ returns = [Kestrel::Client::Transactional::RetryableJob.new(Kestrel::Client::Transactional::DEFAULT_RETRIES - 1, :mcguffin)]
+ stub(@raw_kestrel_client).get(@queue + "_errors", anything) { returns.shift }
+ stub(@raw_kestrel_client).get(@queue, anything)
+ mock(@raw_kestrel_client).set.never
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+
+ get_job.should == :mcguffin
+ @kestrel.current_try.should == Kestrel::Client::Transactional::DEFAULT_RETRIES
+
+ @kestrel.retry
+ @kestrel.get(@queue) # simulate next get run
+ end
+ end
+
describe "Instance Methods" do
before do
- @raw_kestrel_client = Kestrel::Client.new(*Kestrel::Config.default)
- @kestrel = Kestrel::Client::Transactional.new(@raw_kestrel_client)
stub(@kestrel).rand { 1 }
- @queue = "some_queue"
end
describe "#get" do
-
it "asks for a transaction" do
mock(@raw_kestrel_client).get(@queue, :open => true) { :mcguffin }
@kestrel.get(@queue).should == :mcguffin
end
- it "gets from the error queue ERROR_PROCESSING_RATE pct. of the time" do
- mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE - 0.05 }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) { :mcguffin }
- mock(@raw_kestrel_client).get(@queue, anything).never
- @kestrel.get(@queue).should == :mcguffin
- end
-
- it "gets from the normal queue most of the time" do
- mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE + 0.05 }
- mock(@raw_kestrel_client).get(@queue, anything) { :mcguffin }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything).never
- @kestrel.get(@queue).should == :mcguffin
- end
-
it "is nil when the primary queue is empty and selected" do
mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE + 0.05 }
mock(@raw_kestrel_client).get(@queue, anything) { nil }
@@ -51,7 +117,6 @@
end
@kestrel.get(@queue).should == :mcmuffin
- @kestrel.queue_for_last_job.should == @queue + "_errors"
end
it "closes an open transaction with no retries" do
@@ -83,25 +148,6 @@
end
end
- describe "#queue_for_last_job" do
-
- it "returns primary queue for jobs that have not been retried" do
- mock(@raw_kestrel_client).get(@queue, anything) { :mcguffin }
- @kestrel.get(@queue)
- @kestrel.queue_for_last_job.should == @queue
- end
-
- it "returns error queue for a RetryableJob" do
- stub(@kestrel).rand { 0 }
- mock(@raw_kestrel_client).get(@queue + "_errors", anything) do
- Kestrel::Client::Transactional::RetryableJob.new(1, :mcmuffin)
- end
- @kestrel.get(@queue)
- @kestrel.queue_for_last_job.should == @queue + "_errors"
- end
-
- end
-
describe "#retry" do
before do
stub(@raw_kestrel_client).get(@queue, anything) { :mcmuffin }
@@ -169,7 +215,46 @@
mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
@kestrel.retry
end
+ end
+
+ describe "#normal_or_error_queue" do
+ it "returns the error queue ERROR_PROCESSING_RATE pct. of the time" do
+ mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE - 0.05 }
+ @kestrel.send(:normal_or_error_queue, @queue).should == @queue + "_errors"
+ end
+
+ it "returns the normal queue most of the time" do
+ mock(@kestrel).rand { Kestrel::Client::Transactional::ERROR_PROCESSING_RATE + 0.05 }
+ @kestrel.send(:normal_or_error_queue, @queue).should == @queue
+ end
+ end
+ describe "#close_last_transaction" do
+ it "does nothing if there is no last transaction" do
+ mock(@raw_kestrel_client).get_from_last(@queue + "/close").never
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close").never
+ @kestrel.send(:close_last_transaction)
+ end
+
+ it "closes the normal queue if the job was pulled off of the normal queue" do
+ mock(@kestrel).normal_or_error_queue(@queue) { @queue }
+ mock(@raw_kestrel_client).get(@queue, :open => true) { :mcguffin }
+ mock(@raw_kestrel_client).get_from_last(@queue + "/close")
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close").never
+
+ @kestrel.get(@queue).should == :mcguffin
+ @kestrel.send(:close_last_transaction)
+ end
+
+ it "closes the error queue if the job was pulled off of the error queue" do
+ mock(@kestrel).normal_or_error_queue(@queue) { @queue + "_errors" }
+ mock(@raw_kestrel_client).get(@queue + "_errors", anything) { Kestrel::Client::Transactional::RetryableJob.new 1, :mcguffin }
+ mock(@raw_kestrel_client).get_from_last(@queue + "/close").never
+ mock(@raw_kestrel_client).get_from_last(@queue + "_errors/close")
+
+ @kestrel.get(@queue).should == :mcguffin
+ @kestrel.send(:close_last_transaction)
+ end
end
end
end

0 comments on commit d433117

Please sign in to comment.