Skip to content

Commit

Permalink
Merge ca8ede6 into b8ecbf1
Browse files Browse the repository at this point in the history
  • Loading branch information
ndbroadbent committed Jul 8, 2014
2 parents b8ecbf1 + ca8ede6 commit f598020
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 18 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ You can then do the following:
RAILS_ENV=production script/delayed_job --queue=tracking start
RAILS_ENV=production script/delayed_job --queues=mailers,tasks start

# Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues.
# The following command will start 1 worker for the tracking queue,
# 2 workers for the mailers and tasks queues, and 2 workers for any jobs:
RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start

# Runs all available jobs and then exits
RAILS_ENV=production script/delayed_job start --exit-on-complete
# or to run in the foreground
Expand Down
68 changes: 50 additions & 18 deletions lib/delayed/command.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
begin
require 'daemons'
rescue LoadError
raise "You need to add gem 'daemons' to your Gemfile if you wish to use it."
unless ENV['RAILS_ENV'] == 'test'
begin
require 'daemons'
rescue LoadError
raise "You need to add gem 'daemons' to your Gemfile if you wish to use it."
end
end
require 'optparse'

module Delayed
class Command
attr_accessor :worker_count
attr_accessor :worker_count, :worker_pools

def initialize(args)
@options = {
Expand Down Expand Up @@ -61,6 +63,9 @@ def initialize(args)
opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue|
@options[:queues] = queue.split(',')
end
opt.on('--pool=queue1[,queue2][:worker_count]', "Specify queues and number of workers for a worker pool") do |pool|
parse_worker_pool(pool)
end
opt.on('--exit-on-complete', 'Exit when no more jobs are available to run. This will exit if all jobs are scheduled to run in the future.') do
@options[:exit_on_complete] = true
end
Expand All @@ -72,42 +77,69 @@ def daemonize
dir = @options[:pid_dir]
Dir.mkdir(dir) unless File.exist?(dir)

if @options[:identifier]
if @worker_count > 1
fail(ArgumentError.new('Cannot specify both --number-of-workers and --identifier'))
elsif @worker_count == 1
process_name = "delayed_job.#{@options[:identifier]}"
run_process(process_name, dir)
if @worker_pools
worker_index = 0
@worker_pools.each do |queues, worker_count|
options = @options.merge(:queues => queues)
worker_count.times do
process_name = "delayed_job.#{worker_index}"
run_process(process_name, options)
worker_index += 1
end
end

elsif @worker_count > 1 && @options[:identifier]
fail(ArgumentError.new('Cannot specify both --number-of-workers and --identifier'))

elsif @worker_count == 1 && @options[:identifier]
process_name = "delayed_job.#{@options[:identifier]}"
run_process(process_name, @options)

else
worker_count.times do |worker_index|
process_name = worker_count == 1 ? 'delayed_job' : "delayed_job.#{worker_index}"
run_process(process_name, dir)
process_name = worker_count == 1 ? "delayed_job" : "delayed_job.#{worker_index}"
run_process(process_name, @options)
end
end
end

def run_process(process_name, dir)
def run_process(process_name, options = {})
Delayed::Worker.before_fork
Daemons.run_proc(process_name, :dir => dir, :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args|
$0 = File.join(@options[:prefix], process_name) if @options[:prefix]
Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*args|
$0 = File.join(options[:prefix], process_name) if @options[:prefix]
run process_name
end
end

def run(worker_name = nil)
def run(worker_name = nil, options = {})
Dir.chdir(Rails.root)

Delayed::Worker.after_fork
Delayed::Worker.logger ||= Logger.new(File.join(Rails.root, 'log', 'delayed_job.log'))

worker = Delayed::Worker.new(@options)
worker = Delayed::Worker.new(options)
worker.name_prefix = "#{worker_name} "
worker.start
rescue => e
Rails.logger.fatal e
STDERR.puts e.message
exit 1
end


private

def parse_worker_pool(pool)
@worker_pools ||= []

queues, worker_count = pool.split(':')
if ['*', '', nil].include?(queues)
queues = []
else
queues = queues.split(',')
end
worker_count = (worker_count || 1).to_i rescue 1
@worker_pools << [queues, worker_count]
end
end
end
57 changes: 57 additions & 0 deletions spec/delayed/command_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
require 'helper'
require 'delayed/command'

describe Delayed::Command do
describe "parsing --pool argument" do
it "should parse --pool correctly" do
command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2'])

expect(command.worker_pools).to eq [
[ [], 1 ],
[ ['test_queue'], 4 ],
[ ['mailers', 'misc'], 2 ]
]
end

it "should allow * or blank to specify any pools" do
command = Delayed::Command.new(['--pool=*:4'])
expect(command.worker_pools).to eq [
[ [], 4 ],
]

command = Delayed::Command.new(['--pool=:4'])
expect(command.worker_pools).to eq [
[ [], 4 ],
]
end

it "should default to one worker if not specified" do
command = Delayed::Command.new(['--pool=mailers'])
expect(command.worker_pools).to eq [
[ ['mailers'], 1 ],
]
end
end

describe "running worker pools defined by multiple --pool arguments" do
it "should run the correct worker processes" do
command = Delayed::Command.new(['--pool=*:1', '--pool=test_queue:4', '--pool=mailers,misc:2'])

expect(Dir).to receive(:mkdir).with('./tmp/pids').once

[
["delayed_job.0", {:quiet=>true, :pid_dir=>"./tmp/pids", :queues=>[]}],
["delayed_job.1", {:quiet=>true, :pid_dir=>"./tmp/pids", :queues=>["test_queue"]}],
["delayed_job.2", {:quiet=>true, :pid_dir=>"./tmp/pids", :queues=>["test_queue"]}],
["delayed_job.3", {:quiet=>true, :pid_dir=>"./tmp/pids", :queues=>["test_queue"]}],
["delayed_job.4", {:quiet=>true, :pid_dir=>"./tmp/pids", :queues=>["test_queue"]}],
["delayed_job.5", {:quiet=>true, :pid_dir=>"./tmp/pids", :queues=>["mailers", "misc"]}],
["delayed_job.6", {:quiet=>true, :pid_dir=>"./tmp/pids", :queues=>["mailers", "misc"]}]
].each do |args|
expect(command).to receive(:run_process).with(*args).once
end

command.daemonize
end
end
end
6 changes: 6 additions & 0 deletions spec/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
Delayed::Worker.logger = Logger.new('/tmp/dj.log')
ENV['RAILS_ENV'] = 'test'

class Rails
def self.root
'.'
end
end

Delayed::Worker.backend = :test

# Add this directory so the ActiveSupport autoloading works
Expand Down

0 comments on commit f598020

Please sign in to comment.