Skip to content
This repository has been archived by the owner on Mar 16, 2020. It is now read-only.

Commit

Permalink
Add die_after option to worker (limit number of job processed before …
Browse files Browse the repository at this point in the history
…worker die)
  • Loading branch information
JonathanTron committed Jun 28, 2011
1 parent 15e1ca0 commit 65526d6
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
4 changes: 4 additions & 0 deletions bin/cyclop
Expand Up @@ -61,6 +61,10 @@ else
# Limit this worker to job queued by this host
# use "localhost" to let Cyclop set it to the host running the worker
limit_to_host: "server1.mydomain.tld"
# Exit worker after having processed x jobs
# nil : no limit
# [1-9]+: limit to this number
die_after: 100
# Load actions in this directory (default to ./actions)
actions: "/app/actions"
sleep_interval: 0.5 # in seconds
Expand Down
2 changes: 1 addition & 1 deletion lib/cyclop/version.rb
@@ -1,3 +1,3 @@
module Cyclop
VERSION = "0.1.2"
VERSION = "0.1.3"
end
13 changes: 12 additions & 1 deletion lib/cyclop/worker.rb
Expand Up @@ -8,6 +8,10 @@ class Worker
attr_accessor :sleep_interval
# Path to actions directory
attr_accessor :actions
# Number of jobs to process before exiting
attr_accessor :die_after
# Number of jobs processed by this worker
attr_accessor :processed_jobs
# Options passed to Cyclop.next to get next job
attr_accessor :job_opts

Expand All @@ -18,6 +22,8 @@ def initialize(config={})
self.logger = Logger.new(config["log_file"] || $stdout)
self.sleep_interval = config["sleep_interval"] || 1
self.actions = config["actions"] || "./actions"
self.processed_jobs = 0
self.die_after = config["die_after"]
@job_opts = {}
if config["limit_to_host"]
@job_opts[:host] = config["limit_to_host"]
Expand All @@ -44,7 +50,7 @@ def initialize(config={})
def run
register_signal_handlers
loop do
if @stop
if stop?
log "Shutting down..."
break
end
Expand All @@ -56,6 +62,7 @@ def run
procline msg
Process.wait
log "Child process #{@pid} ended with status: #{$?}"
self.processed_jobs += 1
if $?.exitstatus==0
job.complete!
else
Expand Down Expand Up @@ -131,5 +138,9 @@ def log(message)
def load_actions
Dir["#{actions}/*.rb"].each{|action| require action}
end

def stop?
@stop || (die_after && processed_jobs >= die_after.to_i)
end
end
end
44 changes: 41 additions & 3 deletions spec/cyclop/worker_spec.rb
Expand Up @@ -7,6 +7,7 @@
its(:logger){ should_not be_nil }
its(:sleep_interval){ should == 1 }
its(:actions){ should == "./actions" }
its(:processed_jobs){ should == 0 }

it "raise ArgumentError without mongo['database']" do
lambda {
Expand Down Expand Up @@ -43,13 +44,18 @@
})
end
context "with successful action" do
it "remove the job" do
job = Cyclop.push queue: "slow", job_params: ["tony@starkenterprises.com", :welcome]
before do
@job = Cyclop.push queue: "slow", job_params: ["tony@starkenterprises.com", :welcome]
t = Thread.new { worker.run }
sleep 1
worker.stop
t.join
Cyclop::Job.find(job._id).should be_nil
end
it "remove the job" do
Cyclop::Job.find(@job._id).should be_nil
end
it "increments the number of processed jobs" do
worker.processed_jobs.should == 1
end
end

Expand All @@ -73,6 +79,15 @@
job.failed.should be_true
job.attempts.should == 2
end

it "increments the number of processed jobs" do
job = Cyclop.push queue: "slow", job_params: ["tony@starkenterprises.com"]
t = Thread.new { worker.run }
sleep 1
worker.stop
t.join
worker.processed_jobs.should == 1
end
end

context "limiting to jobs queued by a given host" do
Expand All @@ -99,5 +114,28 @@
Cyclop::Job.find(job_local._id).should be_nil
end
end

context "limiting the number of jobs to process" do
let(:worker) do
Cyclop::Worker.new({
"log_file" => File.expand_path("../../../test.log", __FILE__),
"mongo" => {"database" => "cyclop_test"},
"actions" => File.expand_path("../../fixtures/actions", __FILE__),
"die_after" => 1,
})
end
it "only processes specified number jobs" do
job1 = Cyclop.push queue: "slow", job_params: ["tony@starkenterprises.com", :welcome]
job2 = Cyclop.push queue: "slow", job_params: ["tony@starkinternationals.com", :welcome]
2.times do
t = Thread.new { worker.run }
sleep 1
worker.stop
t.join
end
Cyclop::Job.find(job1._id).should be_nil
Cyclop::Job.find(job2._id).should == job2
end
end
end
end

0 comments on commit 65526d6

Please sign in to comment.