Skip to content

Commit

Permalink
Convert stager from fibers to threads
Browse files Browse the repository at this point in the history
There are a couple of reasons for this change. Firstly, we don't need
a massive amount of concurrency. Threads will serve us well enough
and the resulting code is easier to read, maintain, and reason about.
Secondly, this will ease the transition to using Warden containers
for staging applications.

I also took the opportunity to clean up some tests as I was refactoring.

Test plan:

- Unit tests pass (and are expanded).
- BVTs pass
- Manual testing using a script that executes parallel deploys succeeds.

Change-Id: Ibb883a588de59d5d551bbd8e03261c45dd924580
  • Loading branch information
mpage committed Apr 30, 2012
1 parent 45591af commit 6a5a9d0
Show file tree
Hide file tree
Showing 17 changed files with 677 additions and 466 deletions.
1 change: 1 addition & 0 deletions Gemfile
Expand Up @@ -8,6 +8,7 @@ gem 'yajl-ruby', '>= 0.7.9'
gem 'vcap_common', '~> 1.0.8'
gem 'vcap_logging', '>= 0.1.3'
gem 'vcap_staging', '~> 0.1.52'
gem 'vcap-concurrency', '~> 0.0.1'

group :test do
gem 'rspec'
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Expand Up @@ -36,6 +36,7 @@ GEM
rack (>= 1.0.0)
tilt (1.3.2)
uuidtools (2.1.2)
vcap-concurrency (0.0.1)
vcap_common (1.0.9)
eventmachine (~> 0.12.11.cloudfoundry.3)
nats (~> 0.4.22.beta.8)
Expand Down Expand Up @@ -66,6 +67,7 @@ DEPENDENCIES
rake
rspec
sinatra
vcap-concurrency (~> 0.0.1)
vcap_common (~> 1.0.8)
vcap_logging (>= 0.1.3)
vcap_staging (~> 0.1.52)
Expand Down
7 changes: 4 additions & 3 deletions bin/stager
Expand Up @@ -8,6 +8,7 @@ $LOAD_PATH.unshift(File.expand_path('../../lib', __FILE__))

require 'vcap/common'
require 'vcap/component'
require 'vcap/concurrency'
require 'vcap/stager'

sleep_interval = 1
Expand Down Expand Up @@ -51,8 +52,8 @@ VCAP::Stager.init(config)
user_mgr=nil
if config[:secure]
VCAP::Stager::SecureUserManager.instance.setup
user_mgr = VCAP::Stager::SecureUserManager.instance
user_mgr = VCAP::Concurrency::Proxy.new(VCAP::Stager::SecureUserManager.instance)
end
task_mgr = VCAP::Stager::TaskManager.new(config[:max_active_tasks], user_mgr)
server = VCAP::Stager::Server.new(config[:nats_uri], task_mgr, config)
thread_pool = VCAP::Concurrency::ThreadPool.new(config[:max_active_tasks])
server = VCAP::Stager::Server.new(config[:nats_uri], thread_pool, user_mgr, config)
server.run
4 changes: 2 additions & 2 deletions lib/vcap/stager.rb
@@ -1,5 +1,6 @@
require 'vcap/stager/constants'
require 'vcap/stager/config'
require 'vcap/stager/process_runner'
require 'vcap/stager/server'
require 'vcap/stager/task'
require 'vcap/stager/task_error'
Expand All @@ -8,6 +9,7 @@
require 'vcap/stager/task_result'
require 'vcap/stager/util'
require 'vcap/stager/version'
require 'vcap/stager/workspace'

module VCAP
module Stager
Expand All @@ -20,8 +22,6 @@ def init(config)
StagingPlugin.manifest_root = config[:dirs][:manifests]
StagingPlugin.load_all_manifests
StagingPlugin.validate_configuration!
VCAP::Stager::Task.set_defaults(config)
VCAP::Stager::Task.set_defaults({:manifest_dir => config[:dirs][:manifests]})
end
end
end
Expand Down
99 changes: 99 additions & 0 deletions lib/vcap/stager/process_runner.rb
@@ -0,0 +1,99 @@
module VCAP
module Stager
end
end

class VCAP::Stager::ProcessRunner
MAX_READLEN = 1024 * 1024

def initialize(logger)
@logger = logger
end

# Runs a command and captures stdout/stderr/status
#
# @param [String] cmd The command to run.
# @param [Hash] opts
# @option opts [Integer] :timeout How long the process is allowed to run for
#
# @return [Hash] A hash with the following keys:
# :stdout => String
# :stderr => String
# :timed_out => Boolean
# :status => Process::Status
def run(cmd, opts = {})
pipes = [IO.pipe, IO.pipe]

child_pid = Process.spawn(cmd, :out => pipes[0][1], :err => pipes[1][1])

# Only need the read side in parent
pipes.each { |ios| ios[1].close }

child_stdout, child_stderr = pipes[0][0], pipes[1][0]

timeout = opts[:timeout] ? Float(opts[:timeout]) : nil

# Holds data read thus far
child_stdio_bufs = {
child_stdout => "",
child_stderr => "",
}

active = nil
watched = child_stdio_bufs.keys
start = Time.now

while !watched.empty? &&
(active = IO.select(watched, nil, watched, timeout))
active.flatten.each do |io|
begin
child_stdio_bufs[io] << io.read_nonblock(MAX_READLEN)
rescue IO::WaitReadable
# Wait for more data
rescue EOFError
watched.delete(io)
end
end

if timeout
now = Time.now
timeout -= now - start
start = now
end
end

ret = {
:stdout => child_stdio_bufs[child_stdout],
:stderr => child_stdio_bufs[child_stderr],
:timed_out => active.nil?,
:status => nil,
}

Process.kill("KILL", child_pid) if ret[:timed_out]

Process.waitpid(child_pid)

ret[:status] = $?

ret
ensure
pipes.each do |ios|
ios.each { |io| io.close unless io.closed? }
end
end

# Runs the supplied command and logs the exit status, stdout, and stderr.
#
# @see VCAP::Stager::ProcessRunner#run for a description of arguments and
# return value.
def run_logged(cmd, opts = {})
ret = run(cmd, opts)

exitstatus = ret[:status].exitstatus
@logger.debug("Command #{cmd} exited with status #{exitstatus}")
@logger.debug("stdout: #{ret[:stdout]}")
@logger.debug("stderr: #{ret[:stderr]}")

ret
end
end
106 changes: 84 additions & 22 deletions lib/vcap/stager/server.rb
Expand Up @@ -31,41 +31,57 @@ def close
end
end

def initialize(nats_uri, task_mgr, config={})
@nats_uri = nats_uri
@nats_conn = nil
@task_mgr = nil
@channels = []
@config = config
@task_mgr = task_mgr
@logger = VCAP::Logging.logger('vcap.stager.server')
def initialize(nats_uri, thread_pool, user_manager, config={})
@nats_uri = nats_uri
@nats_conn = nil
@channels = []
@config = config
@task_config = create_task_config(config, user_manager)
@logger = VCAP::Logging.logger('vcap.stager.server')
@thread_pool = thread_pool
@user_manager = user_manager
@shutdown_thread = nil
end

def run
install_error_handlers()
install_signal_handlers()
install_error_handlers

install_signal_handlers

@thread_pool.start

EM.run do
@nats_conn = NATS.connect(:uri => @nats_uri) do
VCAP::Stager::Task.set_defaults(:nats => @nats_conn)
VCAP::Component.register(:type => 'Stager',
:index => @config[:index],
:host => VCAP.local_ip(@config[:local_route]),
:config => @config,
:nats => @nats_conn)
setup_channels()
@task_mgr.varz = VCAP::Component.varz

setup_channels

@logger.info("Server running")
end
end
end

# Stops receiving new tasks, waits for existing tasks to finish, then stops.
#
# NB: This is called from a signal handler, so be sure to wrap all EM
# interaction with EM.next_tick.
def shutdown
@logger.info("Shutdown initiated, waiting for remaining #{@task_mgr.num_tasks} task(s) to finish")
@channels.each {|c| c.close }
@task_mgr.on_idle do
@logger.info("All tasks completed. Exiting!")
EM.stop
num_tasks = @thread_pool.num_active_tasks + @thread_pool.num_queued_tasks
@logger.info("Shutdown initiated.")
@logger.info("Waiting for remaining #{num_tasks} task(s) to finish.")

@channels.each do |c|
EM.next_tick { c.close }
end

@shutdown_thread = Thread.new do
# Blocks until all threads have finished
@thread_pool.shutdown
EM.next_tick { EM.stop }
end
end

Expand All @@ -86,8 +102,8 @@ def install_error_handlers
end

def install_signal_handlers
trap('USR2') { shutdown() }
trap('INT') { shutdown() }
trap('USR2') { shutdown }
trap('INT') { shutdown }
end

def setup_channels
Expand All @@ -101,12 +117,58 @@ def setup_channels
def add_task(task_msg)
begin
@logger.debug("Decoding task '#{task_msg}'")
task = VCAP::Stager::Task.decode(task_msg)

request = Yajl::Parser.parse(task_msg)
rescue => e
@logger.warn("Failed decoding '#{task_msg}': #{e}")
@logger.warn(e)
return
end
@task_mgr.add_task(task)

@thread_pool.enqueue { execute_request(request) }

@logger.info("Enqueued request #{request}")

nil
end

def execute_request(request)
task = VCAP::Stager::Task.new(request, @task_config)

result = nil
begin
task.perform

result = VCAP::Stager::TaskResult.new(task.task_id, task.log)

@logger.info("Task #{task.task_id} succeeded")
rescue VCAP::Stager::TaskError => te
@logger.warn("Task #{task.task_id} failed: #{te}")

result = VCAP::Stager::TaskResult.new(task.task_id, task.log, te)
rescue Exception => e
@logger.error("Unexpected exception: #{e}")
@logger.error(e)

raise e
end

EM.next_tick { @nats_conn.publish(request["notify_subj"], result.encode) }

nil
end

def create_task_config(server_config, user_manager)
task_config = {
:ruby_path => server_config[:ruby_path],
:run_plugin_path => server_config[:run_plugin_path],
:secure_user_manager => user_manager,
}

if server_config[:dirs]
task_config[:manifest_root] = server_config[:dirs][:manifests]
end

task_config
end
end

0 comments on commit 6a5a9d0

Please sign in to comment.