From 6a5a9d0bc003d5f09eecc2c674d18a576d81a755 Mon Sep 17 00:00:00 2001 From: mpage Date: Mon, 23 Apr 2012 12:02:48 -0700 Subject: [PATCH] Convert stager from fibers to threads There are a couple of reasons for this change. Firstly, we don't need a massive amount of concurrency. Threads will serve us well enough and the resulting code is easier to read, maintain, and reason about. Secondly, this will ease the transition to using Warden containers for staging applications. I also took the opportunity to clean up some tests as I was refactoring. Test plan: - Unit tests pass (and are expanded). - BVTs pass - Manual testing using a script that executes parallel deploys succeeds. Change-Id: Ibb883a588de59d5d551bbd8e03261c45dd924580 --- Gemfile | 1 + Gemfile.lock | 2 + bin/stager | 7 +- lib/vcap/stager.rb | 4 +- lib/vcap/stager/process_runner.rb | 99 +++++++ lib/vcap/stager/server.rb | 106 +++++-- lib/vcap/stager/task.rb | 461 +++++++++++++---------------- lib/vcap/stager/task_error.rb | 4 + lib/vcap/stager/workspace.rb | 35 +++ spec/functional/stager_spec.rb | 84 ++---- spec/functional/task_spec.rb | 91 ++++++ spec/support/dummy_http_handler.rb | 36 +++ spec/support/util.rb | 16 + spec/unit/process_runner_spec.rb | 44 +++ spec/unit/task_result_spec.rb | 4 +- spec/unit/task_spec.rb | 117 -------- spec/unit/workspace_spec.rb | 32 ++ 17 files changed, 677 insertions(+), 466 deletions(-) create mode 100644 lib/vcap/stager/process_runner.rb create mode 100644 lib/vcap/stager/workspace.rb create mode 100644 spec/functional/task_spec.rb create mode 100644 spec/support/dummy_http_handler.rb create mode 100644 spec/support/util.rb create mode 100644 spec/unit/process_runner_spec.rb delete mode 100644 spec/unit/task_spec.rb create mode 100644 spec/unit/workspace_spec.rb diff --git a/Gemfile b/Gemfile index 9278eea..6556a16 100644 --- a/Gemfile +++ b/Gemfile @@ -8,6 +8,7 @@ gem 'yajl-ruby', '>= 0.7.9' gem 'vcap_common', '~> 1.0.8' gem 'vcap_logging', '>= 0.1.3' gem 'vcap_staging', '~> 0.1.52' +gem 'vcap-concurrency', '~> 0.0.1' group :test do gem 'rspec' diff --git a/Gemfile.lock b/Gemfile.lock index 81c6a61..2bcf3da 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -36,6 +36,7 @@ GEM rack (>= 1.0.0) tilt (1.3.2) uuidtools (2.1.2) + vcap-concurrency (0.0.1) vcap_common (1.0.9) eventmachine (~> 0.12.11.cloudfoundry.3) nats (~> 0.4.22.beta.8) @@ -66,6 +67,7 @@ DEPENDENCIES rake rspec sinatra + vcap-concurrency (~> 0.0.1) vcap_common (~> 1.0.8) vcap_logging (>= 0.1.3) vcap_staging (~> 0.1.52) diff --git a/bin/stager b/bin/stager index 20cb9a5..cebf0e1 100755 --- a/bin/stager +++ b/bin/stager @@ -8,6 +8,7 @@ $LOAD_PATH.unshift(File.expand_path('../../lib', __FILE__)) require 'vcap/common' require 'vcap/component' +require 'vcap/concurrency' require 'vcap/stager' sleep_interval = 1 @@ -51,8 +52,8 @@ VCAP::Stager.init(config) user_mgr=nil if config[:secure] VCAP::Stager::SecureUserManager.instance.setup - user_mgr = VCAP::Stager::SecureUserManager.instance + user_mgr = VCAP::Concurrency::Proxy.new(VCAP::Stager::SecureUserManager.instance) end -task_mgr = VCAP::Stager::TaskManager.new(config[:max_active_tasks], user_mgr) -server = VCAP::Stager::Server.new(config[:nats_uri], task_mgr, config) +thread_pool = VCAP::Concurrency::ThreadPool.new(config[:max_active_tasks]) +server = VCAP::Stager::Server.new(config[:nats_uri], thread_pool, user_mgr, config) server.run diff --git a/lib/vcap/stager.rb b/lib/vcap/stager.rb index 49858d1..19cde43 100644 --- a/lib/vcap/stager.rb +++ b/lib/vcap/stager.rb @@ -1,5 +1,6 @@ require 'vcap/stager/constants' require 'vcap/stager/config' +require 'vcap/stager/process_runner' require 'vcap/stager/server' require 'vcap/stager/task' require 'vcap/stager/task_error' @@ -8,6 +9,7 @@ require 'vcap/stager/task_result' require 'vcap/stager/util' require 'vcap/stager/version' +require 'vcap/stager/workspace' module VCAP module Stager @@ -20,8 +22,6 @@ def init(config) StagingPlugin.manifest_root = config[:dirs][:manifests] StagingPlugin.load_all_manifests StagingPlugin.validate_configuration! - VCAP::Stager::Task.set_defaults(config) - VCAP::Stager::Task.set_defaults({:manifest_dir => config[:dirs][:manifests]}) end end end diff --git a/lib/vcap/stager/process_runner.rb b/lib/vcap/stager/process_runner.rb new file mode 100644 index 0000000..e3f03d9 --- /dev/null +++ b/lib/vcap/stager/process_runner.rb @@ -0,0 +1,99 @@ +module VCAP + module Stager + end +end + +class VCAP::Stager::ProcessRunner + MAX_READLEN = 1024 * 1024 + + def initialize(logger) + @logger = logger + end + + # Runs a command and captures stdout/stderr/status + # + # @param [String] cmd The command to run. + # @param [Hash] opts + # @option opts [Integer] :timeout How long the process is allowed to run for + # + # @return [Hash] A hash with the following keys: + # :stdout => String + # :stderr => String + # :timed_out => Boolean + # :status => Process::Status + def run(cmd, opts = {}) + pipes = [IO.pipe, IO.pipe] + + child_pid = Process.spawn(cmd, :out => pipes[0][1], :err => pipes[1][1]) + + # Only need the read side in parent + pipes.each { |ios| ios[1].close } + + child_stdout, child_stderr = pipes[0][0], pipes[1][0] + + timeout = opts[:timeout] ? Float(opts[:timeout]) : nil + + # Holds data read thus far + child_stdio_bufs = { + child_stdout => "", + child_stderr => "", + } + + active = nil + watched = child_stdio_bufs.keys + start = Time.now + + while !watched.empty? && + (active = IO.select(watched, nil, watched, timeout)) + active.flatten.each do |io| + begin + child_stdio_bufs[io] << io.read_nonblock(MAX_READLEN) + rescue IO::WaitReadable + # Wait for more data + rescue EOFError + watched.delete(io) + end + end + + if timeout + now = Time.now + timeout -= now - start + start = now + end + end + + ret = { + :stdout => child_stdio_bufs[child_stdout], + :stderr => child_stdio_bufs[child_stderr], + :timed_out => active.nil?, + :status => nil, + } + + Process.kill("KILL", child_pid) if ret[:timed_out] + + Process.waitpid(child_pid) + + ret[:status] = $? + + ret + ensure + pipes.each do |ios| + ios.each { |io| io.close unless io.closed? } + end + end + + # Runs the supplied command and logs the exit status, stdout, and stderr. + # + # @see VCAP::Stager::ProcessRunner#run for a description of arguments and + # return value. + def run_logged(cmd, opts = {}) + ret = run(cmd, opts) + + exitstatus = ret[:status].exitstatus + @logger.debug("Command #{cmd} exited with status #{exitstatus}") + @logger.debug("stdout: #{ret[:stdout]}") + @logger.debug("stderr: #{ret[:stderr]}") + + ret + end +end diff --git a/lib/vcap/stager/server.rb b/lib/vcap/stager/server.rb index 986d77f..f81dc7e 100644 --- a/lib/vcap/stager/server.rb +++ b/lib/vcap/stager/server.rb @@ -31,41 +31,57 @@ def close end end - def initialize(nats_uri, task_mgr, config={}) - @nats_uri = nats_uri - @nats_conn = nil - @task_mgr = nil - @channels = [] - @config = config - @task_mgr = task_mgr - @logger = VCAP::Logging.logger('vcap.stager.server') + def initialize(nats_uri, thread_pool, user_manager, config={}) + @nats_uri = nats_uri + @nats_conn = nil + @channels = [] + @config = config + @task_config = create_task_config(config, user_manager) + @logger = VCAP::Logging.logger('vcap.stager.server') + @thread_pool = thread_pool + @user_manager = user_manager + @shutdown_thread = nil end def run - install_error_handlers() - install_signal_handlers() + install_error_handlers + + install_signal_handlers + + @thread_pool.start + EM.run do @nats_conn = NATS.connect(:uri => @nats_uri) do - VCAP::Stager::Task.set_defaults(:nats => @nats_conn) VCAP::Component.register(:type => 'Stager', :index => @config[:index], :host => VCAP.local_ip(@config[:local_route]), :config => @config, :nats => @nats_conn) - setup_channels() - @task_mgr.varz = VCAP::Component.varz + + setup_channels + @logger.info("Server running") end end end # Stops receiving new tasks, waits for existing tasks to finish, then stops. + # + # NB: This is called from a signal handler, so be sure to wrap all EM + # interaction with EM.next_tick. def shutdown - @logger.info("Shutdown initiated, waiting for remaining #{@task_mgr.num_tasks} task(s) to finish") - @channels.each {|c| c.close } - @task_mgr.on_idle do - @logger.info("All tasks completed. Exiting!") - EM.stop + num_tasks = @thread_pool.num_active_tasks + @thread_pool.num_queued_tasks + @logger.info("Shutdown initiated.") + @logger.info("Waiting for remaining #{num_tasks} task(s) to finish.") + + @channels.each do |c| + EM.next_tick { c.close } + end + + @shutdown_thread = Thread.new do + # Blocks until all threads have finished + @thread_pool.shutdown + EM.next_tick { EM.stop } end end @@ -86,8 +102,8 @@ def install_error_handlers end def install_signal_handlers - trap('USR2') { shutdown() } - trap('INT') { shutdown() } + trap('USR2') { shutdown } + trap('INT') { shutdown } end def setup_channels @@ -101,12 +117,58 @@ def setup_channels def add_task(task_msg) begin @logger.debug("Decoding task '#{task_msg}'") - task = VCAP::Stager::Task.decode(task_msg) + + request = Yajl::Parser.parse(task_msg) rescue => e @logger.warn("Failed decoding '#{task_msg}': #{e}") @logger.warn(e) return end - @task_mgr.add_task(task) + + @thread_pool.enqueue { execute_request(request) } + + @logger.info("Enqueued request #{request}") + + nil + end + + def execute_request(request) + task = VCAP::Stager::Task.new(request, @task_config) + + result = nil + begin + task.perform + + result = VCAP::Stager::TaskResult.new(task.task_id, task.log) + + @logger.info("Task #{task.task_id} succeeded") + rescue VCAP::Stager::TaskError => te + @logger.warn("Task #{task.task_id} failed: #{te}") + + result = VCAP::Stager::TaskResult.new(task.task_id, task.log, te) + rescue Exception => e + @logger.error("Unexpected exception: #{e}") + @logger.error(e) + + raise e + end + + EM.next_tick { @nats_conn.publish(request["notify_subj"], result.encode) } + + nil + end + + def create_task_config(server_config, user_manager) + task_config = { + :ruby_path => server_config[:ruby_path], + :run_plugin_path => server_config[:run_plugin_path], + :secure_user_manager => user_manager, + } + + if server_config[:dirs] + task_config[:manifest_root] = server_config[:dirs][:manifests] + end + + task_config end end diff --git a/lib/vcap/stager/task.rb b/lib/vcap/stager/task.rb index 45e0bb0..c9683d7 100644 --- a/lib/vcap/stager/task.rb +++ b/lib/vcap/stager/task.rb @@ -1,17 +1,14 @@ -require 'fiber' -require 'fileutils' -require 'nats/client' -require 'tmpdir' -require 'uri' -require 'yajl' - -require 'vcap/common' -require 'vcap/logging' -require 'vcap/staging/plugin/common' - -require 'vcap/stager/constants' -require 'vcap/stager/task_error' -require 'vcap/stager/task_result' +require "nats/client" +require "uri" +require "yajl" + +require "vcap/common" +require "vcap/logging" +require "vcap/stager/process_runner" +require "vcap/stager/task_error" +require "vcap/stager/task_logger" +require "vcap/stager/workspace" +require "vcap/staging/plugin/common" module VCAP module Stager @@ -19,287 +16,245 @@ module Stager end class VCAP::Stager::Task - DEFAULTS = { - :nats => NATS, - :manifest_dir => StagingPlugin::DEFAULT_MANIFEST_ROOT, - :max_staging_duration => 120, # 2 min - :download_app_helper_path => File.join(VCAP::Stager::BIN_DIR, 'download_app'), - :upload_droplet_helper_path => File.join(VCAP::Stager::BIN_DIR, 'upload_droplet'), - } - - class << self - def decode(msg) - dec_msg = Yajl::Parser.parse(msg) - VCAP::Stager::Task.new(dec_msg['app_id'], - dec_msg['properties'], - dec_msg['download_uri'], - dec_msg['upload_uri'], - dec_msg['notify_subj']) - end + MAX_STAGING_DURATION = 120 + RUN_PLUGIN_PATH = File.expand_path('../../../../bin/run_plugin', __FILE__) - def set_defaults(defaults={}) - DEFAULTS.update(defaults) - end - end + attr_reader :task_id - attr_reader :task_id, :app_id, :result - attr_accessor :user - - # @param app_id Integer Globally unique id for app - # @param props Hash App properties. Keys are - # :runtime => Application runtime name - # :framework => Application framework name - # :environment => Applications environment variables. - # Hash of NAME => VALUE - # :services => Services bound to app - # :resources => Resource limits - # @param download_uri String Where the stager can fetch the zipped application from. - # @param upload_uri String Where the stager should PUT the gzipped droplet - # @param notify_subj String NATS subject that the stager will publish the result to. - def initialize(app_id, props, download_uri, upload_uri, notify_subj, opts={}) + def initialize(request, opts = {}) + @nats = opts[:nats] || NATS @task_id = VCAP.secure_uuid - @app_id = app_id - @app_props = props - @download_uri = download_uri - @upload_uri = upload_uri - @notify_subj = notify_subj - - @vcap_logger = VCAP::Logging.logger('vcap.stager.task') - @nats = option(opts, :nats) - @max_staging_duration = option(opts, :max_staging_duration) - @run_plugin_path = option(opts, :run_plugin_path) - @ruby_path = option(opts, :ruby_path) - @manifest_dir = option(opts, :manifest_dir) - @tmpdir_base = opts[:tmpdir] - @user = opts[:user] - @download_app_helper_path = option(opts, :download_app_helper_path) - @upload_droplet_helper_path = option(opts, :upload_droplet_helper_path) + @logger = VCAP::Logging.logger("vcap.stager.task") + @task_logger = VCAP::Stager::TaskLogger.new(@logger) + @request = request + @user_manager = opts[:secure_user_manager] + @runner = opts[:runner] || VCAP::Stager::ProcessRunner.new(@logger) + @manifest_dir = opts[:manifest_root] || StagingPlugin::DEFAULT_MANIFEST_ROOT + @ruby_path = opts[:ruby_path] || "ruby" + @run_plugin_path = opts[:run_plugin_path] || RUN_PLUGIN_PATH + @max_staging_duration = opts[:max_staging_duration] || MAX_STAGING_DURATION end - # Performs the staging task, calls the supplied callback upon completion. - # - # NB: We use a fiber internally to avoid descending into callback hell. This - # method could easily end up looking like the following: - # create_staging_dirs do - # download_app do - # unzip_app do - # etc... - # - # @param callback Block Block to be called when the task completes (upon both success - # and failure). This will be called with an instance of VCAP::Stager::TaskResult - def perform(&callback) - Fiber.new do - begin - task_logger = VCAP::Stager::TaskLogger.new(@vcap_logger) - task_logger.info("Starting staging operation") - @vcap_logger.debug("app_id=#{@app_id}, properties=#{@app_props}") - @vcap_logger.debug("download_uri=#{@download_uri} upload_uri=#{@upload_uri} notify_sub=#{@notify_subj}") - - task_logger.info("Setting up temporary directories") - dirs = create_staging_dirs(@tmpdir_base) - - task_logger.info("Fetching application bits from the Cloud Controller") - download_app(dirs[:unstaged], dirs[:base]) - - task_logger.info("Staging application") - run_staging_plugin(dirs[:unstaged], dirs[:staged], dirs[:base], task_logger) - - task_logger.info("Uploading droplet") - upload_droplet(dirs[:staged], dirs[:base]) - - task_logger.info("Done!") - @result = VCAP::Stager::TaskResult.new(@task_id, task_logger.public_log) - @nats.publish(@notify_subj, @result.encode) - callback.call(@result) - - rescue VCAP::Stager::TaskError => te - task_logger.error("Error: #{te}") - @result = VCAP::Stager::TaskResult.new(@task_id, task_logger.public_log, te) - @nats.publish(@notify_subj, @result.encode) - callback.call(@result) - - rescue => e - @vcap_logger.error("Unrecoverable error: #{e}") - @vcap_logger.error(e) - err = VCAP::Stager::InternalError.new - @result = VCAP::Stager::TaskResult.new(@task_id, task_logger.public_log, err) - @nats.publish(@notify_subj, @result.encode) - raise e - - ensure - EM.system("rm -rf #{dirs[:base]}") if dirs - end - - end.resume + def log + @task_logger.public_log end - def encode - h = { - :app_id => @app_id, - :properties => @app_props, - :download_uri => @download_uri, - :upload_uri => @upload_uri, - :notify_subj => @notify_subj, - } - Yajl::Encoder.encode(h) + def enqueue(queue) + @nats.publish("vcap.stager.#{queue}", Yajl::Encoder.encode(@request)) end - def enqueue(queue) - @nats.publish("vcap.stager.#{queue}", encode()) + # Attempts to stage the application and upload the result to the specified + # endpoint. + def perform + @logger.info("Starting task for request: #{@request}") + + @task_logger.info("Setting up temporary directories") + workspace = VCAP::Stager::Workspace.create + + @task_logger.info("Downloading application") + app_path = File.join(workspace.root_dir, "app.zip") + download_app(app_path) + + @task_logger.info("Unpacking application") + unpack_app(app_path, workspace.unstaged_dir) + + @task_logger.info("Staging application") + stage_app(workspace.unstaged_dir, workspace.staged_dir, @task_logger) + + @task_logger.info("Creating droplet") + droplet_path = File.join(workspace.root_dir, "droplet.tgz") + create_droplet(workspace.staged_dir, droplet_path) + + @task_logger.info("Uploading droplet") + upload_droplet(droplet_path) + + @task_logger.info("Done!") + + nil + + ensure + workspace.destroy if workspace end private - def option(hash, key) - if hash.has_key?(key) - hash[key] - else - DEFAULTS[key] + # NB: We use curl here to avoid putting Ruby's GC on the data path. + def download_app(app_path) + cfg_file = Tempfile.new("curl_dl_config") + + write_curl_config(@request["download_uri"], cfg_file.path, + "output" => app_path) + + # Show errors but not progress, fail on non-200 + res = @runner.run_logged("curl -s -S -f -K #{cfg_file.path}") + + unless res[:status].success? + raise VCAP::Stager::TaskError.new("Failed downloading app") end + + nil + ensure + cfg_file.unlink if cfg_file end - # Creates a temporary directory with needed layout for staging, along - # with the correct permissions - # - # @param tmpdir_base String If supplied, the temporary directory will be created under this - # - # @return Hash :base => Base temporary directory - # :unstaged => Unstaged app dir - # :staged => Staged app dir - def create_staging_dirs(tmpdir_base=nil) - # Created with mode 0700 by default - ret = {:base => Dir.mktmpdir(nil, tmpdir_base)} - - @vcap_logger.debug("Created base staging dir at #{ret[:base]}") - - for dir_name in ['unstaged', 'staged'] - dir = File.join(ret[:base], dir_name) - FileUtils.mkdir(dir, :mode => 0700) - ret[dir_name.to_sym] = dir - @vcap_logger.debug("Created #{dir_name} dir at #{dir}") - end + def unpack_app(packed_app_path, dst_dir) + res = @runner.run_logged("unzip -q #{packed_app_path} -d #{dst_dir}") - ret + unless res[:status].success? + raise VCAP::Stager::TaskError.new("Failed unpacking app") + end end - # Downloads the zipped application at @download_uri, unzips it, and stores it - # in dst_dir. - # - # NB: We write the url to a file that only we can read in order to avoid - # exposing auth information. This actually shells out to a helper script in - # order to avoid putting long running code (the stager) on the data - # path. We are sacrificing performance for reliability here... - # - # @param dst_dir String Where to store the downloaded app - # @param tmp_dir String - def download_app(dst_dir, tmp_dir) - uri_path = File.join(tmp_dir, 'stager_dl_uri') - zipped_app_path = File.join(tmp_dir, 'app.zip') - - File.open(uri_path, 'w+') {|f| f.write(@download_uri) } - cmd = "#{@download_app_helper_path} #{uri_path} #{zipped_app_path}" - res = run_logged(cmd) - unless res[:success] - @vcap_logger.error("Failed downloading app from '#{@download_uri}'") - raise VCAP::Stager::AppDownloadError + # Stages the application into the supplied directory. + def stage_app(src_dir, dst_dir, task_logger) + plugin_config = { + "source_dir" => src_dir, + "dest_dir" => dst_dir, + "environment" => @request["properties"], + "manifest_dir" => @manifest_dir, + } + + secure_user = nil + if @user_manager + secure_user = @user_manager.checkout_user + + plugin_config["secure_user"] = { + "uid" => secure_user[:uid], + "gid" => secure_user[:gid], + } end - res = run_logged("unzip -q #{zipped_app_path} -d #{dst_dir}") - unless res[:success] - raise VCAP::Stager::AppUnzipError + plugin_config_file = Tempfile.new("plugin_config") + StagingPlugin::Config.to_file(plugin_config, plugin_config_file.path) + + cmd = [@ruby_path, @run_plugin_path, + @request["properties"]["framework"], + plugin_config_file.path].join(" ") + + res = @runner.run_logged(cmd, + :max_staging_duration => @max_staging_duration) + + capture_staging_log(dst_dir, task_logger) + + # Staging failed, log the error and abort + unless res[:status].success? + emsg = nil + if res[:timed_out] + emsg = "Staging timed out after #{@max_staging_duration} seconds." + else + emsg = "Staging plugin failed: #{res[:stdout]}" + end + + task_logger.warn(emsg) + + raise VCAP::Stager::TaskError.new(emsg) end + nil ensure - FileUtils.rm_f(uri_path) - FileUtils.rm_f(zipped_app_path) + plugin_config_file.unlink if plugin_config_file + + return_secure_user(secure_user) if secure_user end - # Stages our app into dst_dir, looking for the app source in src_dir - # - # @param src_dir String Location of the unstaged app - # @param dst_dir String Where to place the staged app - # @param work_dir String Directory to use to place scratch files - # @param task_logger VCAP::Stager::TaskLogger - def run_staging_plugin(src_dir, dst_dir, work_dir, task_logger) - plugin_config = { - 'source_dir' => src_dir, - 'dest_dir' => dst_dir, - 'environment' => @app_props, - } - plugin_config['secure_user'] = {'uid' => @user[:uid], 'gid' => @user[:gid]} if @user - plugin_config['manifest_dir'] = @manifest_dir if @manifest_dir - plugin_config_path = File.join(work_dir, 'plugin_config.yaml') - StagingPlugin::Config.to_file(plugin_config, plugin_config_path) - cmd = "#{@ruby_path} #{@run_plugin_path} #{@app_props['framework']} #{plugin_config_path}" - - @vcap_logger.debug("Running staging command: '#{cmd}'") - res = run_logged(cmd, 0, @max_staging_duration) - - # Slurp in the plugin log - plugin_log = File.join(dst_dir, 'logs', 'staging.log') - if File.exist?(plugin_log) - File.open(plugin_log, 'r') do |plf| - begin - while line = plf.readline - line.chomp! - task_logger.info(line) - end - rescue EOFError - end - end + def create_droplet(staged_dir, droplet_path) + cmd = ["cd", staged_dir, "&&", "COPYFILE_DISABLE=true", + "tar", "-czf", droplet_path, "*"].join(" ") + + res = @runner.run_logged(cmd) + + unless res[:status].success? + raise VCAP::Stager::TaskError.new("Failed creating droplet") end + end + + def upload_droplet(droplet_path) + cfg_file = Tempfile.new("curl_ul_config") + + write_curl_config(@request["upload_uri"], cfg_file.path, + "form" => "upload[droplet]=@#{droplet_path}") - if res[:timed_out] - @vcap_logger.error("Staging timed out") - raise VCAP::Stager::StagingTimeoutError - elsif !res[:success] - @vcap_logger.error("Staging plugin exited with status '#{res[:status]}'") - raise VCAP::Stager::StagingPluginError, "#{res[:stderr]}" + # Show errors but not progress, fail on non-200 + res = @runner.run_logged("curl -s -S -f -K #{cfg_file.path}") + + unless res[:status].success? + raise VCAP::Stager::TaskError.new("Failed uploading droplet") end + + nil ensure - FileUtils.rm_f(plugin_config_path) if plugin_config_path + cfg_file.unlink if cfg_file end - # Packages and uploads the droplet in staged_dir + # Writes out a curl config to the supplied path. This allows us to use + # authenticated urls without potentially leaking them via the command line. # - # NB: See download_app for an explanation of why we shell out here... + # @param [String] url The url being fetched/updated. + # @param [String] config_path Where to write the config + # @param [Hash] opts A list of key-value curl options. These will + # be written to the config file as: + # = ""\n # - # @param staged_dir String - def upload_droplet(staged_dir, tmp_dir) - droplet_path = File.join(tmp_dir, 'droplet.tgz') - cmd = "cd #{staged_dir} && COPYFILE_DISABLE=true tar -czf #{droplet_path} *" - res = run_logged(cmd) - unless res[:success] - raise VCAP::Stager::DropletCreationError + # @return nil + def write_curl_config(url, config_path, opts = {}) + parsed_url = URI.parse(url) + + config = opts.dup + + if parsed_url.user + config["user"] = [parsed_url.user, parsed_url.password].join(":") + parsed_url.user = nil + parsed_url.password = nil end - uri_path = File.join(tmp_dir, 'stager_ul_uri') - File.open(uri_path, 'w+') {|f| f.write(@upload_uri); f.path } + config["url"] = parsed_url.to_s - cmd = "#{@upload_droplet_helper_path} #{uri_path} #{droplet_path}" - res = run_logged(cmd) - unless res[:success] - @vcap_logger.error("Failed uploading app to '#{@upload_uri}'") - raise VCAP::Stager::DropletUploadError + File.open(config_path, "w+") do |f| + config.each do |k, v| + f.write("#{k} = \"#{v}\"\n") + end end - ensure - FileUtils.rm_f(droplet_path) if droplet_path - FileUtils.rm_f(uri_path) if uri_path + + nil end - # Runs a command, logging the result at the debug level on success, or error level - # on failure. See VCAP::Stager::Util for a description of the arguments. - def run_logged(command, expected_exitstatus=0, timeout=nil) - f = Fiber.current - VCAP::Stager::Util.run_command(command, expected_exitstatus, timeout) {|res| f.resume(res) } - ret = Fiber.yield + # Appends the staging log (if any) to the user visible log + def capture_staging_log(staged_dir, task_logger) + staging_log_path = File.join(staged_dir, "logs", "staging.log") - level = ret[:success] ? :debug : :error - @vcap_logger.send(level, "Command '#{command}' exited with status='#{ret[:status]}', timed_out=#{ret[:timed_out]}") - @vcap_logger.send(level, "stdout: #{ret[:stdout]}") if ret[:stdout] != '' - @vcap_logger.send(level, "stderr: #{ret[:stderr]}") if ret[:stderr] != '' + return unless File.exist?(staging_log_path) + + File.open(staging_log_path, "r") do |sl| + begin + while line = sl.readline + line.chomp! + task_logger.info(line) + end + rescue EOFError + end + end - ret + nil end + # Returns a secure user to the pool and kills any processes belonging to + # said user. + def return_secure_user(user) + @logger.info("Returning user #{user} to pool") + + cmd = "sudo -u '##{user[:uid]}' pkill -9 -U #{user[:uid]}" + kres = @runner.run_logged(cmd) + # 0 : >=1 process matched + # 1 : no process matched + # 2 : error + if kres[:status].exitstatus < 2 + @user_manager.return_user(user) + + true + else + @logger.warn("Failed killing processes for user #{user}") + + false + end + end end diff --git a/lib/vcap/stager/task_error.rb b/lib/vcap/stager/task_error.rb index bd59de0..f5c1361 100644 --- a/lib/vcap/stager/task_error.rb +++ b/lib/vcap/stager/task_error.rb @@ -19,6 +19,10 @@ class TaskError < StandardError class << self attr_reader :desc + def desc + @desc || "Staging task failed" + end + def set_desc(desc) @desc = desc end diff --git a/lib/vcap/stager/workspace.rb b/lib/vcap/stager/workspace.rb new file mode 100644 index 0000000..c7b0fa6 --- /dev/null +++ b/lib/vcap/stager/workspace.rb @@ -0,0 +1,35 @@ +require "tmpdir" + +module VCAP + module Stager + end +end + +# The scratch area used by the staging tasks. +class VCAP::Stager::Workspace + attr_reader :root_dir, # Root of the workspace + :unstaged_dir, # Home to the raw app bits + :staged_dir # Home to the modified app bits + + def self.create(tmpdir_base = nil) + ws = new + + ws.create_paths(tmpdir_base) + + ws + end + + def destroy + FileUtils.rm_rf(@root_dir) + end + + def create_paths(tmpdir_base = nil) + @root_dir = Dir.mktmpdir(nil, tmpdir_base) + + @unstaged_dir = File.join(@root_dir, "unstaged") + FileUtils.mkdir(@unstaged_dir, :mode => 0700) + + @staged_dir = File.join(@root_dir, "staged") + FileUtils.mkdir(@staged_dir, :mode => 0700) + end +end diff --git a/spec/functional/stager_spec.rb b/spec/functional/stager_spec.rb index f7c8ca3..b4b6e6d 100644 --- a/spec/functional/stager_spec.rb +++ b/spec/functional/stager_spec.rb @@ -1,35 +1,7 @@ -require 'sinatra/base' require 'spec_helper' require 'vcap/spec/forked_component/nats_server' -# Simple handler that serves zipped apps from the fixtures directory and -# handles uploads by storing the request body in a user supplied hash -class DummyHandler < Sinatra::Base - use Rack::Auth::Basic do |user, pass| - user == 'foo' && pass = 'sekret' - end - - get '/zipped_apps/:name' do - app_zip_path = File.join(settings.download_path, "#{params[:name]}.zip") - if File.exist?(app_zip_path) - File.read(app_zip_path) - else - [404, ":("] - end - end - - post '/droplets/:name' do - dest_path = File.join(settings.upload_path, params[:name] + '.tgz') - File.open(dest_path, 'w+') {|f| f.write(params[:upload][:droplet]) } - [200, "Success!"] - end - - get '/fail' do - [500, "Oh noes"] - end -end - describe VCAP::Stager do before :all do @task_timeout = ENV['VCAP_TEST_TASK_TIMEOUT'] || 10 @@ -50,8 +22,7 @@ class DummyHandler < Sinatra::Base before :each do @tmp_dirs = create_tmp_dirs @uploads = {} - @http_server = start_http_server(@tmp_dirs[:upload], @tmp_dirs[:download], @tmp_dirs[:http]) - @http_port = @http_server.port + @http_server = start_http_server(@tmp_dirs[:http], @tmp_dirs) @nats_server = start_nats(@tmp_dirs[:nats]) @stager = start_stager(@nats_server.port, StagingPlugin.manifest_root, @@ -71,24 +42,27 @@ class DummyHandler < Sinatra::Base describe 'on success' do it 'it should post a bundled droplet to the callback uri' do - app_id = 'zazzle' - app_name = 'sinatra_gemfile' - dl_uri = app_download_uri(app_name) - ul_uri = droplet_upload_uri(app_name) - subj = 'staging.result' + app_name = "sinatra_trivial" + zip_app(@tmp_dirs[:download], app_name) - # Wait for the stager to tell us it is done - task_result = wait_for_task_result(@nats_server.uri, - subj, - [app_id, @app_props, dl_uri, ul_uri, subj]) + request = { + "app_id" => "zazzle", + "properties" => @app_props, + "download_uri" => DummyHandler.app_download_uri(@http_server, app_name), + "upload_uri" => DummyHandler.droplet_upload_uri(@http_server, app_name), + "notify_subj" => "staging.result", + } + + task_result = wait_for_task_result(@nats_server.uri, request) + task_result.should_not be_nil task_result.was_success?.should be_true end end def create_tmp_dirs - tmp_dirs = {:base => Dir.mktmpdir} + tmp_dirs = { :base => Dir.mktmpdir("stager_functional_tests") } for d in [:upload, :download, :nats, :stager, :http] tmp_dirs[d] = File.join(tmp_dirs[:base], d.to_s) Dir.mkdir(tmp_dirs[d]) @@ -96,22 +70,6 @@ def create_tmp_dirs tmp_dirs end - def zip_app(dir, app_name) - app_source_dir = fixture_path('apps', app_name, 'source') - target_path = File.join(dir, "#{app_name}.zip") - VCAP::Subprocess.run("cd #{app_source_dir}; zip -q -y #{target_path} -r *") - target_path - end - - def start_http_server(upload_path, download_path, http_dir) - port = VCAP.grab_ephemeral_port - DummyHandler.set(:upload_path, upload_path) - DummyHandler.set(:download_path, download_path) - http_server = VCAP::Stager::Spec::ForkedHttpServer.new(DummyHandler, port, http_dir) - http_server.start.wait_ready.should be_true - http_server - end - def start_nats(nats_dir) port = VCAP.grab_ephemeral_port pid_file = File.join(nats_dir, 'nats.pid') @@ -136,24 +94,16 @@ def start_stager(nats_port, manifest_dir, stager_dir) stager end - def wait_for_task_result(nats_uri, subj, task_args) + def wait_for_task_result(nats_uri, request) task_result = nil NATS.start(:uri => nats_uri) do EM.add_timer(@task_timeout) { NATS.stop } - NATS.subscribe(subj) do |msg| + NATS.subscribe(request["notify_subj"]) do |msg| task_result = VCAP::Stager::TaskResult.decode(msg) NATS.stop end - VCAP::Stager::Task.new(*task_args).enqueue('staging') + VCAP::Stager::Task.new(request).enqueue('staging') end task_result end - - def app_download_uri(app_name) - "http://foo:sekret@127.0.0.1:#{@http_port}/zipped_apps/#{app_name}" - end - - def droplet_upload_uri(app_name) - "http://foo:sekret@127.0.0.1:#{@http_port}/droplets/#{app_name}" - end end diff --git a/spec/functional/task_spec.rb b/spec/functional/task_spec.rb new file mode 100644 index 0000000..ca8e283 --- /dev/null +++ b/spec/functional/task_spec.rb @@ -0,0 +1,91 @@ +require "spec_helper" + +describe VCAP::Stager::Task do + describe "#perform" do + before :each do + @work_dir = Dir.mktmpdir + @http_server = start_http_server(@work_dir) + end + + after :each do + @http_server.stop + FileUtils.rm_rf(@work_dir) + end + + it "should raise an error if the download fails" do + # Will 404 + request = { + "download_uri" => DummyHandler.app_download_uri(@http_server, "fake") + } + + expect_error(request, /Failed downloading/) + end + + it "should raise an error if unpacking the app fails" do + app_name = "invalid_app" + invalid_app_path = File.join(@work_dir, "#{app_name}.zip") + File.open(invalid_app_path, "w+") { |f| f.write("garbage") } + + request = create_request(app_name) + + expect_error(request, /Failed unpacking/) + end + + it "should raise an error if staging the application fails" do + app_name = "sinatra_trivial" + app_path = zip_app(@work_dir, app_name) + + # Framework/runtime mismatch. Web.xml will not be found + request = create_request(app_name, + "framework" => "spring", + "runtime" => "java") + + expect_error(request, /Staging plugin failed/) + end + + it "should raise an error if uploading the droplet fails" do + app_name = "sinatra_trivial" + app_path = zip_app(@work_dir, app_name) + + request = create_request(app_name) + # Auth will fail + request["upload_uri"] = "http://127.0.0.1:#{@http_port}" + + expect_error(request, /Failed uploading/) + end + + it "should return nil on success" do + app_name = "sinatra_trivial" + app_path = zip_app(@work_dir, app_name) + request = create_request(app_name) + task = VCAP::Stager::Task.new(request) + + task.perform.should be_nil + + File.exist?(File.join(@work_dir, "#{app_name}.tgz")).should be_true + end + end + + def expect_error(request, matcher) + task = VCAP::Stager::Task.new(request) + expect do + task.perform + end.to raise_error(matcher) + end + + def create_request(app_name, app_props = {}) + { "download_uri" => DummyHandler.app_download_uri(@http_server, app_name), + "upload_uri" => DummyHandler.droplet_upload_uri(@http_server, app_name), + "properties" => { + "framework" => "sinatra", + "runtime" => "ruby18", + "services" => [{}], + "resources" => { + "memory" => 128, + "disk" => 1024, + "fds" => 64, + } + }.merge(app_props), + } + end +end diff --git a/spec/support/dummy_http_handler.rb b/spec/support/dummy_http_handler.rb new file mode 100644 index 0000000..0d659c7 --- /dev/null +++ b/spec/support/dummy_http_handler.rb @@ -0,0 +1,36 @@ +require "sinatra/base" + +# Simple handler that serves zipped apps from the fixtures directory and +# handles uploads by storing the request body in a user supplied hash +class DummyHandler < Sinatra::Base + def self.app_download_uri(http_server, app_name) + "http://foo:sekret@127.0.0.1:#{http_server.port}/zipped_apps/#{app_name}" + end + + def self.droplet_upload_uri(http_server, app_name) + "http://foo:sekret@127.0.0.1:#{http_server.port}/droplets/#{app_name}" + end + + use Rack::Auth::Basic do |user, pass| + user == 'foo' && pass = 'sekret' + end + + get '/zipped_apps/:name' do + app_zip_path = File.join(settings.download_path, "#{params[:name]}.zip") + if File.exist?(app_zip_path) + File.read(app_zip_path) + else + [404, ":("] + end + end + + post '/droplets/:name' do + dest_path = File.join(settings.upload_path, params[:name] + '.tgz') + File.open(dest_path, 'w+') {|f| f.write(params[:upload][:droplet]) } + [200, "Success!"] + end + + get '/fail' do + [500, "Oh noes"] + end +end diff --git a/spec/support/util.rb b/spec/support/util.rb new file mode 100644 index 0000000..82e78d7 --- /dev/null +++ b/spec/support/util.rb @@ -0,0 +1,16 @@ +def zip_app(dir, app_name) + app_source_dir = fixture_path('apps', app_name, 'source') + target_path = File.join(dir, "#{app_name}.zip") + VCAP::Subprocess.run("cd #{app_source_dir}; zip -q -y #{target_path} -r *") + target_path +end + +def start_http_server(http_dir, opt_dirs = {}) + port = VCAP.grab_ephemeral_port + DummyHandler.set(:upload_path, opt_dirs[:upload] || http_dir) + DummyHandler.set(:download_path, opt_dirs[:download] || http_dir) + http_server = VCAP::Stager::Spec::ForkedHttpServer.new(DummyHandler, + port, http_dir) + http_server.start.wait_ready.should be_true + http_server +end diff --git a/spec/unit/process_runner_spec.rb b/spec/unit/process_runner_spec.rb new file mode 100644 index 0000000..90311a6 --- /dev/null +++ b/spec/unit/process_runner_spec.rb @@ -0,0 +1,44 @@ +require File.join(File.dirname(__FILE__), "spec_helper") + +require "logger" + +describe VCAP::Stager::ProcessRunner do + describe "#run" do + it "should return the correct exit status, stdout, and stderr" do + runner = VCAP::Stager::ProcessRunner.new(nil) + + ret = runner.run("sh -c 'echo foo; echo bar >&2; exit 2'") + + ret[:stdout].should == "foo\n" + ret[:stderr].should == "bar\n" + ret[:status].exitstatus.should == 2 + end + + it "should allow commands to be timed out" do + runner = VCAP::Stager::ProcessRunner.new(nil) + + ret = runner.run("sh -c 'echo foo; sleep 5'", :timeout => 0.25) + + ret[:stdout].should == "foo\n" + ret[:timed_out].should be_true + end + end + + describe "#run_logged" do + it "should log exit status, stdout, and stderr" do + log_buf = StringIO.new("") + logger = Logger.new(log_buf) + logger.level = Logger::DEBUG + logger.formatter = proc { |sev, dt, pn, msg| msg } + runner = VCAP::Stager::ProcessRunner.new(logger) + + runner.run_logged("sh -c 'echo foo; echo bar >&2; exit 2'") + + raw_buf = log_buf.string + + raw_buf.should match(/foo/) + raw_buf.should match(/bar/) + raw_buf.should match(/status 2/) + end + end +end diff --git a/spec/unit/task_result_spec.rb b/spec/unit/task_result_spec.rb index 9f2b4f0..f0bedfe 100644 --- a/spec/unit/task_result_spec.rb +++ b/spec/unit/task_result_spec.rb @@ -19,11 +19,11 @@ describe '.decode' do it 'should decode encoded task results' do - tr = VCAP::Stager::TaskResult.new('xxx', 'yyy', VCAP::Stager::AppDownloadError.new) + tr = VCAP::Stager::TaskResult.new('xxx', 'yyy', VCAP::Stager::TaskError.new) dec_tr = VCAP::Stager::TaskResult.decode(tr.encode) dec_tr.task_id.should == tr.task_id dec_tr.task_log.should == tr.task_log - dec_tr.error.class.should == VCAP::Stager::AppDownloadError + dec_tr.error.class.should == VCAP::Stager::TaskError end end end diff --git a/spec/unit/task_spec.rb b/spec/unit/task_spec.rb deleted file mode 100644 index b4d2aca..0000000 --- a/spec/unit/task_spec.rb +++ /dev/null @@ -1,117 +0,0 @@ -require File.join(File.dirname(__FILE__), 'spec_helper') - -require 'tmpdir' - -describe VCAP::Stager::Task do - describe '#creating_staging_dirs' do - it 'should create the basic directory structure needed for staging' do - task = VCAP::Stager::Task.new(nil, nil, nil, nil, nil) - dirs = task.send(:create_staging_dirs) - File.directory?(dirs[:base]).should be_true - File.directory?(dirs[:unstaged]).should be_true - File.directory?(dirs[:staged]).should be_true - FileUtils.rm_rf(dirs[:base]) if dirs[:base] - end - end - - describe '#download_app' do - before :each do - @tmp_dir = Dir.mktmpdir - @task = VCAP::Stager::Task.new(1, nil, nil, nil, nil) - end - - after :each do - FileUtils.rm_rf(@tmp_dir) - end - - it 'should raise an instance of VCAP::Stager::AppDownloadError if the download fails' do - @task.stub(:run_logged).and_return({:success => false}) - expect { @task.send(:download_app, @tmp_dir, @tmp_dir) }.to raise_error(VCAP::Stager::AppDownloadError) - end - - it 'should raise an instance of VCAP::Stager::AppUnzipError if the unzip fails' do - @task.stub(:run_logged).and_return({:success => true}, {:success => false}) - expect { @task.send(:download_app, @tmp_dir, @tmp_dir) }.to raise_error(VCAP::Stager::AppUnzipError) - end - - it 'should leave the temporary working dir as it found it' do - glob_exp = File.join(@tmp_dir, '*') - pre_files = Dir.glob(glob_exp) - @task.stub(:run_logged).and_return({:success => true}, {:success => true}) - @task.send(:download_app, @tmp_dir, @tmp_dir) - Dir.glob(glob_exp).should == pre_files - end - end - - describe '#run_staging_plugin' do - before :each do - @tmp_dir = Dir.mktmpdir - @props = { - 'runtime' => 'ruby', - 'framework' => 'sinatra', - 'services' => [{}], - 'resources' => { - 'memory' => 128, - 'disk' => 1024, - 'fds' => 64, - }, - } - @task = VCAP::Stager::Task.new(1, @props, nil, nil, nil) - end - - after :each do - FileUtils.rm_rf(@tmp_dir) - end - - it 'should raise an instance of VCAP::Stager::StagingTimeoutError on plugin timeout' do - @task.stub(:run_logged).and_return({:success => false, :timed_out => true}) - expect { @task.send(:run_staging_plugin, @tmp_dir, @tmp_dir, @tmp_dir, nil) }.to raise_error(VCAP::Stager::StagingTimeoutError) - end - - it 'should raise an instance of VCAP::Stager::StagingPlugin on plugin failure' do - @task.stub(:run_logged).and_return({:success => false}) - expect { @task.send(:run_staging_plugin, @tmp_dir, @tmp_dir, @tmp_dir, nil) }.to raise_error(VCAP::Stager::StagingPluginError) - end - - it 'should leave the temporary working dir as it found it' do - glob_exp = File.join(@tmp_dir, '*') - pre_files = Dir.glob(glob_exp) - @task.stub(:run_logged).and_return({:success => true}) - @task.send(:run_staging_plugin, @tmp_dir, @tmp_dir, @tmp_dir, nil) - Dir.glob(glob_exp).should == pre_files - end - end - - describe '#upload_app' do - before :each do - @tmp_dir = Dir.mktmpdir - @task = VCAP::Stager::Task.new(1, nil, nil, nil, nil) - end - - after :each do - FileUtils.rm_rf(@tmp_dir) - end - - it 'should raise an instance of VCAP::Stager::DropletCreationError if the gzip fails' do - @task.stub(:run_logged).and_return({:success => false}) - expect { @task.send(:upload_droplet, @tmp_dir, @tmp_dir) }.to raise_error(VCAP::Stager::DropletCreationError) - end - - it 'should raise an instance of VCAP::Stager::DropletUploadError if the upload fails' do - @task.stub(:run_logged).and_return({:success => true}, {:success => false}) - expect { @task.send(:upload_droplet, @tmp_dir, @tmp_dir) }.to raise_error(VCAP::Stager::DropletUploadError) - end - - it 'should leave the temporary working dir as it found it' do - glob_exp = File.join(@tmp_dir, '*') - pre_files = Dir.glob(glob_exp) - @task.stub(:run_logged).and_return({:success => true}, {:success => true}) - @task.send(:upload_droplet, @tmp_dir, @tmp_dir) - Dir.glob(glob_exp).should == pre_files - end - end - - - describe '#perform' do - end -end diff --git a/spec/unit/workspace_spec.rb b/spec/unit/workspace_spec.rb new file mode 100644 index 0000000..5d14b70 --- /dev/null +++ b/spec/unit/workspace_spec.rb @@ -0,0 +1,32 @@ +require File.join(File.dirname(__FILE__), "spec_helper") + +describe VCAP::Stager::Workspace do + before :each do + @ws_root = Dir.mktmpdir + end + + after :each do + FileUtils.rm_rf(@ws_root) + end + + describe ".create" do + it "should return a materialized workspace" do + ws = VCAP::Stager::Workspace.create(@ws_root) + + [:root_dir, :unstaged_dir, :staged_dir].each do |name| + File.directory?(ws.send(name)).should be_true + end + end + end + + describe "#destroy" do + it "should remove the workspace from the filesystem" do + ws = VCAP::Stager::Workspace.create(@ws_root) + ws.destroy + + [:root_dir, :unstaged_dir, :staged_dir].each do |name| + File.exist?(ws.send(name)).should be_false + end + end + end +end