Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

WIP: cleaning up

  • Loading branch information...
commit b4dc003022aa42fd7050fe252c1f4bf10c3774b7 1 parent b95c741
Brandon Keepers bkeepers authored
Showing with 72 additions and 55 deletions.
  1. +59 −53 lib/delayed/master.rb
  2. +1 −1  lib/delayed_job.rb
  3. +12 −1 spec/master_spec.rb
112 lib/delayed/master.rb
View
@@ -9,106 +9,112 @@ def initialize(options = {})
end
def start
- if GC.respond_to?(:copy_on_write_friendly=)
- GC.copy_on_write_friendly = true
- end
-
abort "Process is already running with pid #{pid}" if running?
if pid_file.file?
- warn "Deleting stale pid file at #{pid_file}"
+ logger.info "Deleting stale pid file at #{pid_file}"
pid_file.delete
end
+ GC.copy_on_write_friendly = true if GC.respond_to?(:copy_on_write_friendly=)
+
require 'config/environment'
Delayed::Job.before_fork
- @pid = fork do
- $0 = 'delayed_job'
+ @pid = fork { run }
- [:TERM, :INT, :QUIT].each do |sig|
- trap sig do
- logger.info "SIG#{sig} received. Shutting down."
+ # Wait for process to finish starting
+ sleep 0.1 until running? && pid_file.file?
- # # kill the children and reap them before terminating
- # Process.kill :TERM, *children.keys
- # Process.waitall
+ # detach the master process and exit;
+ # note that Process#detach calls setsid(2)
+ Process.detach @pid
+ @pid
+ end
- pid_file.delete if pid_file.file?
+ def run
+ $0 = 'delayed_job'
- # reset trap handlers so we don't get caught in a trap loop
- trap sig, 'DEFAULT'
+ [:TERM, :INT, :QUIT].each do |sig|
+ trap sig do
+ logger.info "SIG#{sig} received. Shutting down."
- # propagate the signal like a proper process should
- Process.kill sig, $$
+ # reset trap handlers so we don't get caught in a trap loop
+ trap :CLD, 'DEFAULT'
+ trap sig, 'DEFAULT'
- # FIXME: I don't understand why, but process will not stop without following
- Process.wait
+ # kill the children and reap them before terminating
+ unless children.keys.empty?
+ logger.debug "Terminating workers: #{children.inspect}"
+ Process.kill :TERM, *children.keys
end
- end
-
- # Write pid file
- pid_file.dirname.mkpath
- pid_file.open('w') { |f| f.write $$ }
+ Process.waitall
+ logger.info "Terminated workers"
- logger.info "Starting with #{worker_count} workers"
+ pid_file.delete if pid_file.file?
- # # silence output like a proper daemon
- # [$stdin, $stdout, $stderr].each { |io| io.reopen '/dev/null' }
-
- Delayed::Job.after_fork
-
- run
+ # propagate the signal like a proper process should
+ Process.kill sig, $$
+ Process.waitall
+ end
end
- # Wait for process to finish starting
- sleep 0.1 until running? && pid_file.file?
+ # # silence output like a proper daemon
+ [STDIN, STDOUT, STDERR].each { |io| io.reopen '/dev/null' }
- # # detach the master process and exit;
- # # note that Process#detach calls setsid(2)
- # Process.detach pid
- pid
- end
+ Delayed::Job.after_fork
- def run
# Spawn a new worker when one dies
trap :CLD do
- id = children.delete Process.wait
- spawn_worker(id)
+ handle_child_death
end
+ # Write pid file
+ pid_file.dirname.mkpath
+ pid_file.open('w') { |f| f.write $$ }
+
+ logger.info "Starting with #{worker_count} workers"
+
# Create the worker ids
worker_count.times {|id| available_workers << id }
loop do
- available_workers.each do |id|
- if !spawn_worker(id)
- sleep 5
- end
+ logger.debug "available_workers: #{available_workers.inspect}"
+ logger.debug "busy workers: #{children.values.inspect}"
+ while id = available_workers.shift
+ sleep 5 if !spawn_worker(id)
end
+ logger.debug "No workers available, waiting for child death"
+ handle_child_death
end
end
+ def handle_child_death
+ id = children.delete(Process.wait)
+ # available_workers << children.delete(Process.wait)
+ logger.debug "Worker #{id} reaped. status:#{$?.exitstatus}"
+ spawn_worker id
+ end
+
def spawn_worker(id)
- logger.debug "Spawning worker #{id}"
worker = Worker.new(id)
job = Delayed::Job.reserve(worker.name)
if job
- available_workers.delete(id)
-
+ logger.debug "Reserved job #{job.id} for worker #{id}"
Delayed::Job.before_fork
pid = fork do
# $0 = "delayed_worker.#{id}"
- #
- # # reset all inherited traps from main process
- # [:CLD, :HUP, :TERM, :INT, :QUIT].each { |sig| trap sig, 'DEFAULT' }
+
+ # reset all inherited traps from main process
+ [:CLD, :HUP, :TERM, :INT, :QUIT].each { |sig| trap sig, 'DEFAULT' }
Delayed::Job.after_fork
worker.run(job)
end
-
children[pid] = id
+ logger.debug "Forked worker #{id}. pid:#{pid}"
pid
else
+ logger.debug "No jobs available for worker #{id}"
available_workers << id
false
end
2  lib/delayed_job.rb
View
@@ -7,11 +7,11 @@
require File.dirname(__FILE__) + '/delayed/backend/base'
require File.dirname(__FILE__) + '/delayed/worker'
require File.dirname(__FILE__) + '/delayed/railtie' if defined?(Rails::Railtie)
+require 'delayed/logger_formatter'
module Delayed
autoload :Command, 'delayed/command'
autoload :Master, 'delayed/master'
- autoload :LoggerFormatter, 'delayed/logger_formatter'
end
Object.send(:include, Delayed::MessageSending)
13 spec/master_spec.rb
View
@@ -27,6 +27,13 @@
@master.pid_file.dirname.directory?.should be_true
end
+ it "should detach process" do
+ @master.start
+ pid = @master.pid
+ Process.kill :TERM, pid
+ wait_until {`ps -ho pid,state -p #{pid}`.should_not include(pid.to_s) }
+ end
+
context "with a stale pid file" do
before do
@pid = fork {}
@@ -106,6 +113,10 @@
end
describe "spawn_worker" do
+ before do
+ Delayed::Job.delete_all
+ end
+
it "should find and run a job" do
job = Delayed::Job.enqueue SimpleJob.new
Process.wait(@master.spawn_worker(0))
@@ -115,7 +126,7 @@
it "should not run if a job does not exist" do
@master.should_not_receive(:fork)
- @master.spawn_worker(0)
+ @master.spawn_worker(0).should be_false
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.