Skip to content

Commit

Permalink
Added enhancements to Worker.
Browse files Browse the repository at this point in the history
* They are now aware of shutdowns that occur, and will terminate
  themselves.
* They keep track of messages_processed after every run_loop().
* They pass message IDs to the WorkerError class.
  • Loading branch information
dbalatero committed Jun 6, 2009
1 parent 00cf81f commit 91d834a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
6 changes: 5 additions & 1 deletion lib/queue_stick/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ def delete_message_from_queue(message)
end

def run_loop
# Exit the run loop if we get a shutdown request.
Thread.current.exit if Thread.current[:shutdown]

message = get_message_from_queue

begin
process(message.body)
delete_message_from_queue(message)
counter(:messages_processed).increment!
rescue Exception => process_error
error = WorkerError.new(:dummy_id) # TODO(dbalatero): message_id?!
error = WorkerError.new(message.id)
error.exceptions << process_error
begin
recover
Expand Down
19 changes: 19 additions & 0 deletions spec/queue_stick/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ class RunLoopTestWorker < QueueStick::Worker
}.should_not raise_error
end

it "should automatically increment messages_processed on each loop" do
worker = QueueStick::MockWorker.new
worker.run_loop
worker.counter(:messages_processed).counts[0].should == 1
worker.run_loop
worker.counter(:messages_processed).counts[0].should == 2
end

it "should call delete_message_from_queue if process succeeds" do
@worker.should_receive(:process).and_return(true)
@worker.should_receive(:delete_message_from_queue).and_return(true)
Expand All @@ -53,6 +61,17 @@ class RunLoopTestWorker < QueueStick::Worker
@worker.run_loop
end

it "should terminate after a run if the current thread's :shutdown variable is set" do
worker = QueueStick::MockWorker.new
thread = Thread.new do
worker.run_loop while true
end
thread[:shutdown] = true

thread.join
thread.should_not be_alive
end

it "should log errors when process fails" do
@worker.should_receive(:process).and_raise(Exception)
@worker.should_receive(:recover).and_return(true)
Expand Down

0 comments on commit 91d834a

Please sign in to comment.