Permalink
Browse files

Adding master worker which controls a pool of workers

  • Loading branch information...
1 parent da90932 commit 0d5cb4aaac964089fcb8470bffd710fe5c88687d @andrewtimberlake committed Nov 29, 2012
Showing with 247 additions and 3 deletions.
  1. +2 −1 Gemfile
  2. +3 −1 Gemfile.lock
  3. +3 −1 cascade.gemspec
  4. +1 −0 lib/cascade.rb
  5. +107 −0 lib/cascade/master.rb
  6. +131 −0 spec/lib/cascade/master_spec.rb
View
@@ -1,9 +1,10 @@
source "http://rubygems.org"
gem "rake"
-gem "bundler", "~> 1.0.0"
+gem "bundler", "~> 1.0"
gem "mongo_mapper"
gem "bson_ext", "1.3.0"
+gem "kgio", "~> 2.7"
group :test do
gem "rspec", ">= 2.0.0"
View
@@ -11,6 +11,7 @@ GEM
builder (2.1.2)
diff-lcs (1.1.2)
i18n (0.5.0)
+ kgio (2.7.4)
mongo (1.3.0)
bson (>= 1.3.0)
mongo_mapper (0.9.0)
@@ -36,7 +37,8 @@ PLATFORMS
DEPENDENCIES
bson_ext (= 1.3.0)
- bundler (~> 1.0.0)
+ bundler (~> 1.0)
+ kgio (~> 2.7)
mongo_mapper
rake
rcov
View
@@ -26,11 +26,13 @@ Gem::Specification.new do |s|
if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
s.add_development_dependency(%q<bundler>, [">= 1.0"])
+ s.add_development_dependency(%q<kgio>, ["~> 2.7"])
else
s.add_dependency(%q<bundler>, [">= 1.0"])
+ s.add_dependency(%q<kgio>, ["~> 2.7"])
end
else
s.add_dependency(%q<bundler>, [">= 1.0"])
+ s.add_dependency(%q<kgio>, ["~> 2.7"])
end
end
-
View
@@ -1,3 +1,4 @@
+require 'cascade/master'
require 'cascade/worker'
require 'cascade/job'
require 'cascade/job_spec'
View
@@ -0,0 +1,107 @@
+require 'kgio'
+
+module Cascade
+ class Master
+ def initialize(worker_class=Worker)
+ @worker_class = worker_class
+ end
+ attr_reader :worker_class
+ attr_accessor :children
+
+ CHILDREN = []
+ SIG_QUEUE = []
+ SELF_PIPE = []
+
+ def start
+ init_self_pipe!
+
+ [:TERM, :INT, :QUIT, :TTIN, :TTOU, :CHLD].each do |sig|
+ trap(sig) do
+ SIG_QUEUE << sig
+ awaken_master
+ end
+ end
+
+ (0..1).each do |i|
+ CHILDREN << fork_worker(i)
+ end
+
+ loop do
+ process_signals
+ break if $exit
+ master_sleep
+ break if $exit
+ end
+
+ Process.waitall
+ end
+
+ def init_self_pipe!
+ SELF_PIPE.each { |io| io.close rescue nil }
+ SELF_PIPE.replace(Kgio::Pipe.new)
+ SELF_PIPE.each { |io| io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
+ end
+
+ def awaken_master
+ SELF_PIPE[1].kgio_trywrite('.') # wakeup master process from select
+ end
+
+ def master_sleep(sec=2)
+ IO.select([ SELF_PIPE[0] ], nil, nil, sec) or return
+ SELF_PIPE[0].kgio_tryread(11)
+ end
+
+ def process_signals
+ while true
+ case SIG_QUEUE.shift
+ when nil
+ return true
+ when :TERM, :INT, :QUIT
+ stop
+ when :TTIN
+ CHILDREN << fork_worker(CHILDREN.size)
+ when :TTOU
+ pid = CHILDREN.pop
+ Process.kill(:QUIT, pid)
+ when :CHLD
+ reap_all_workers
+ end
+ end
+ end
+
+ def stop
+ CHILDREN.each do |pid|
+ Process.kill(:QUIT, pid)
+ end
+ $exit = true
+ end
+
+ def reap_all_workers
+ begin
+ while pid = Process.wait(-1, Process::WNOHANG)
+ index = CHILDREN.index(pid)
+ break unless index #If we're reducing the children through SIGTTOU then we won't find the child and we don't want to refork
+ unless $exit
+ CHILDREN[index] = fork_worker(index+1)
+ end
+ end
+ rescue Errno::ECHILD
+ #Ignore if we have no children
+ end
+ end
+
+ def fork_worker(number)
+ fork do
+ [:TTIN, :TTOU].each do |sig|
+ trap(sig) {}
+ end
+ [:TERM, :INT, :QUIT].each do |sig|
+ trap(sig) do
+ $exit = true
+ end
+ end
+ worker_class.new(number+1).start
+ end
+ end
+ end
+end
@@ -0,0 +1,131 @@
+require 'spec_helper'
+
+module Cascade
+ describe Master do
+ def capture_output
+ old_stdout = STDOUT.clone
+ pipe_r, pipe_w = IO.pipe
+ pipe_r.sync = true
+ output = ""
+ reader = Thread.new do
+ begin
+ loop do
+ output << pipe_r.readpartial(1024)
+ end
+ rescue EOFError
+ end
+ end
+ STDOUT.reopen(pipe_w)
+ yield
+ ensure
+ STDOUT.reopen(old_stdout)
+ pipe_w.close
+ reader.join
+ return output
+ end
+
+ before do
+ $stdout.sync = true
+ end
+
+ it "master and all workers should exit on TERM" do
+ output = capture_output do
+ master = fork do
+ Master.new(TestWorker).start
+ end
+ sleep 0.2
+ Process.kill(:TERM, master)
+ Process.waitall
+ end.split(/\n/)
+
+ #puts "output: #{output.inspect}"
+ output.should include("Starting worker 1")
+ output.should include("Starting worker 2")
+ output.should include("Stopping worker 1")
+ output.should include("Stopping worker 2")
+ end
+
+ it "auto restarts a worker which dies" do
+ output = capture_output do
+ master = fork do
+ Master.new(DyingWorker).start
+ end
+ sleep 0.2
+ Process.kill(:TERM, master)
+ Process.waitall
+ end.split(/\n/)
+
+ #puts "output: #{output.inspect}"
+ output.select{|line| line =~ /^Starting worker/ }.should have(4).lines
+ output.should include("Dying worker 1")
+ end
+
+ it "adds a worker on SIGTTIN" do
+ output = capture_output do
+ master = fork do
+ Master.new(TestWorker).start
+ end
+ sleep 0.2
+ Process.kill(:TTIN, master)
+ sleep 0.2
+ Process.kill(:TERM, master)
+ Process.waitall
+ end.split(/\n/)
+
+ #puts "output: #{output.inspect}"
+ output.select{|line| line =~ /^Starting worker/ }.should have(3).lines
+ output.select{|line| line =~ /^Stopping worker/ }.should have(3).lines
+ end
+
+ it "removes a worker on SIGTTOU" do
+ output = capture_output do
+ master = fork do
+ Master.new(TestWorker).start
+ end
+ sleep 0.2
+ Process.kill(:TTOU, master)
+ sleep 0.2
+ print "Should now have 1 process\n"
+ sleep 0.2
+ Process.kill(:TERM, master)
+ Process.waitall
+ end.split(/\n/)
+
+ #puts "output: #{output.inspect}"
+ output.should == ['Starting worker 1', 'Starting worker 2', 'Stopping worker 2', 'Should now have 1 process', 'Stopping worker 1']
+ end
+ end
+
+ class TestWorker
+ def initialize(number)
+ @number = number
+ end
+ attr_reader :number
+
+ def start
+ print "Starting worker #{number}\n"
+ run_loop
+ print "Stopping worker #{number}\n"
+ end
+
+ def run_loop
+ loop do
+ sleep 0.1
+ in_loop
+ break if $exit
+ end
+ end
+
+ def in_loop
+ # Do nothing
+ end
+ end
+
+ class DyingWorker < TestWorker
+ def in_loop
+ print "Dying worker #{number}\n"
+ exit(1)
+ end
+ end
+
+end

0 comments on commit 0d5cb4a

Please sign in to comment.