diff --git a/cloud_controller/Gemfile b/cloud_controller/Gemfile index f362a347d..ad466df61 100644 --- a/cloud_controller/Gemfile +++ b/cloud_controller/Gemfile @@ -13,7 +13,7 @@ gem 'cf-uaa-client', '>= 0.0.8' # For queuing staging tasks gem 'em-hiredis' -gem 'vcap_stager', '~> 0.1.14' +gem 'stager-client', '~> 0.0.2' # Databases gem 'sqlite3' diff --git a/cloud_controller/Gemfile.lock b/cloud_controller/Gemfile.lock index ac3605be3..0a54e4514 100644 --- a/cloud_controller/Gemfile.lock +++ b/cloud_controller/Gemfile.lock @@ -121,6 +121,10 @@ GEM rack (~> 1.1) tilt (>= 1.2.2, < 2.0) sqlite3 (1.3.3) + stager-client (0.0.2) + eventmachine + nats + yajl-ruby thin (1.3.1) daemons (>= 1.0.9) eventmachine (>= 0.12.6) @@ -138,7 +142,6 @@ GEM thin (~> 1.3.1) yajl-ruby (~> 0.8.3) vcap_logging (1.0.1) - vcap_stager (0.1.14) vcap_staging (0.1.55) nokogiri (>= 1.4.4) rake @@ -175,10 +178,10 @@ DEPENDENCIES ruby-hmac (~> 0.4.0) sinatra sqlite3 + stager-client (~> 0.0.2) thin uuidtools (~> 2.1.2) vcap_common vcap_logging - vcap_stager (~> 0.1.14) vcap_staging (~> 0.1.55) yajl-ruby (~> 0.8.3) diff --git a/cloud_controller/app/controllers/apps_controller.rb b/cloud_controller/app/controllers/apps_controller.rb index 575e5ed2d..bdf53d726 100644 --- a/cloud_controller/app/controllers/apps_controller.rb +++ b/cloud_controller/app/controllers/apps_controller.rb @@ -1,4 +1,4 @@ -require 'staging_task_manager' +require "vcap/stager/client" class AppsController < ApplicationController before_filter :require_user, :except => [:download_staged] @@ -219,25 +219,36 @@ def files private def stage_app(app) - task_mgr = StagingTaskManager.new(:logger => CloudController.logger, - :timeout => AppConfig[:staging][:max_staging_runtime]) dl_uri = StagingController.download_app_uri(app) ul_hdl = StagingController.create_upload(app) - result = task_mgr.run_staging_task(app, dl_uri, ul_hdl.upload_uri) + client = VCAP::Stager::Client::FiberAware.new(NATS.client, + AppConfig[:staging][:queue]) - # Update run count to be consistent with previous staging code - if result.was_success? - CloudController.logger.debug("Staging task for app_id=#{app.id} succeded.", :tags => [:staging]) - CloudController.logger.debug1("Details: #{result.task_log}", :tags => [:staging]) + request = { + "app_id" => app.id, + "properties" => app.staging_task_properties, + "download_uri" => dl_uri, + "upload_uri" => ul_hdl.upload_uri, + } + + begin + result = client.stage(request, AppConfig[:staging][:max_staging_runtime]) + StagingTaskLog.new(app.id, result["task_log"]).save + rescue VCAP::Stager::Client::Error => e + result = { "error" => e.to_s } + end + + if result["error"] + CloudController.logger.warn("Staging for app_id=#{app.id} failed") + CloudController.logger.warn("Error: #{result["error"]}") + raise CloudError.new(CloudError::APP_STAGING_ERROR, result["error"]) + else + # Update run count to be consistent with previous staging code + CloudController.logger.info("Staging for app_id=#{app.id} succeeded") app.update_staged_package(ul_hdl.upload_path) app.package_state = 'STAGED' app.update_run_count() - else - CloudController.logger.warn("Staging task for app_id=#{app.id} failed: #{result.error}", - :tags => [:staging]) - CloudController.logger.debug1("Details: #{result.task_log}", :tags => [:staging]) - raise CloudError.new(CloudError::APP_STAGING_ERROR, result.error.to_s) end rescue => e diff --git a/cloud_controller/app/models/app_manager.rb b/cloud_controller/app/models/app_manager.rb index 35f937ece..35d054477 100644 --- a/cloud_controller/app/models/app_manager.rb +++ b/cloud_controller/app/models/app_manager.rb @@ -1,5 +1,3 @@ -require 'vcap/stager' - class AppManager attr_reader :app diff --git a/cloud_controller/app/models/staging_task_log.rb b/cloud_controller/app/models/staging_task_log.rb index ff6db47c7..f5a50628f 100644 --- a/cloud_controller/app/models/staging_task_log.rb +++ b/cloud_controller/app/models/staging_task_log.rb @@ -17,7 +17,7 @@ def fetch_fibered(app_id, redis=nil, timeout=5) get_def = redis.get(key) get_def.timeout(timeout) get_def.errback do |e| - e = VCAP::Stager::StagingTimeoutError.new("Timed out fetching result") if e == nil + e = CloudError.new(CloudError::SYSTEM_ERROR, "Timed out fetching result") if e == nil logger.error("Failed fetching result for key '#{key}': #{e}") logger.error(e) f.resume([false, e]) diff --git a/cloud_controller/config/appconfig.rb b/cloud_controller/config/appconfig.rb index 887150e6a..5a97e1d88 100644 --- a/cloud_controller/config/appconfig.rb +++ b/cloud_controller/config/appconfig.rb @@ -205,6 +205,8 @@ exit 1 end +AppConfig[:staging][:queue] ||= "staging" + if AppConfig[:bootstrap_users] unless AppConfig[:bootstrap_users].kind_of?(Array) $stderr.puts "List of bootstrap users must be an array" diff --git a/cloud_controller/lib/staging_task_manager.rb b/cloud_controller/lib/staging_task_manager.rb deleted file mode 100644 index 245fdce3f..000000000 --- a/cloud_controller/lib/staging_task_manager.rb +++ /dev/null @@ -1,57 +0,0 @@ -require 'nats/client' - -require 'vcap/stager/task' -require 'vcap/stager/task_result' - -class StagingTaskManager - DEFAULT_TASK_TIMEOUT = 120 - - def initialize(opts={}) - @logger = opts[:logger] || VCAP::Logging.logger('vcap.cc.staging_manager') - @nats_conn = opts[:nats_conn] || NATS.client - @timeout = opts[:timeout] || DEFAULT_TASK_TIMEOUT - end - - # Enqueues a staging task in redis, blocks until task completes or a timeout occurs. - # - # @param app App The app to be staged - # @param dl_uri String URI that the stager can download the app from - # @param ul_uri String URI that the stager should upload the staged droplet to - # - # @return VCAP::Stager::TaskResult - def run_staging_task(app, dl_uri, ul_uri) - inbox = "cc.staging." + VCAP.secure_uuid - f = Fiber.current - - # Wait for notification from the stager - exp_timer = nil - sid = @nats_conn.subscribe(inbox) do |msg| - @logger.debug("Received result from stager on '#{inbox}' : '#{msg}'") - @nats_conn.unsubscribe(sid) - EM.cancel_timer(exp_timer) - f.resume(msg) - end - - # Setup timer to expire our request if we don't hear a response from the stager in time - exp_timer = EM.add_timer(@timeout) do - @logger.warn("Staging timed out for app_id=#{app.id} (timeout=#{@timeout}, unsubscribing from '#{inbox}'", - :tags => [:staging]) - @nats_conn.unsubscribe(sid) - f.resume(nil) - end - - task = VCAP::Stager::Task.new(app.id, app.staging_task_properties, dl_uri, ul_uri, inbox) - task.enqueue('staging') - @logger.debug("Enqeued staging task for app_id=#{app.id}.", :tags => [:staging]) - - reply = Fiber.yield - if reply - result = VCAP::Stager::TaskResult.decode(reply) - StagingTaskLog.new(app.id, result.task_log).save - else - result = VCAP::Stager::TaskResult.new(nil, nil, "Timed out waiting for stager's reply.") - end - - result - end -end diff --git a/cloud_controller/spec/models/staging_task_log_spec.rb b/cloud_controller/spec/models/staging_task_log_spec.rb index 880c4c517..157379eb5 100644 --- a/cloud_controller/spec/models/staging_task_log_spec.rb +++ b/cloud_controller/spec/models/staging_task_log_spec.rb @@ -58,7 +58,7 @@ Fiber.new do expect do res = StagingTaskLog.fetch_fibered(@task_id, @redis_mock) - end.to raise_error(VCAP::Stager::TaskError) + end.to raise_error(CloudError) end.resume @deferrable_mock.fail(nil) end diff --git a/cloud_controller/spec/staging/staging_task_manager_spec.rb b/cloud_controller/spec/staging/staging_task_manager_spec.rb deleted file mode 100644 index 3ac31f29d..000000000 --- a/cloud_controller/spec/staging/staging_task_manager_spec.rb +++ /dev/null @@ -1,46 +0,0 @@ -require 'spec_helper' - -describe StagingTaskManager do - before :all do - # Prevent EM/NATS related initializers from running - EM.instance_variable_set(:@next_tick_queue, []) - end - - describe '#run_staging_task' do - it 'should expire long running tasks' do - nats_conn = mock() - logger = stub_everything(:logger) - - nats_conn.expects(:subscribe).with(any_parameters()) - nats_conn.expects(:unsubscribe).with(any_parameters()) - - task = stub_everything(:task) - VCAP::Stager::Task.expects(:new).with(any_parameters()).returns(task) - - app = create_stub_app(12345) - stm = StagingTaskManager.new( - :logger => logger, - :nats_conn => nats_conn, - :timeout => 1 - ) - - res = nil - EM.run do - Fiber.new do - EM.add_timer(2) { EM.stop } - res = stm.run_staging_task(app, nil, nil) - end.resume - end - - res.should be_instance_of(VCAP::Stager::TaskResult) - res.was_success?.should be_false - end - end - - def create_stub_app(id, props={}) - ret = mock("app_#{id}") - ret.stubs(:id).returns(id) - ret.stubs(:staging_task_properties).returns(props) - ret - end -end diff --git a/cloud_controller/vendor/cache/stager-client-0.0.2.gem b/cloud_controller/vendor/cache/stager-client-0.0.2.gem new file mode 100644 index 000000000..70af46eb3 Binary files /dev/null and b/cloud_controller/vendor/cache/stager-client-0.0.2.gem differ diff --git a/cloud_controller/vendor/cache/vcap_stager-0.1.14.gem b/cloud_controller/vendor/cache/vcap_stager-0.1.14.gem deleted file mode 100644 index 9bdb4dd1f..000000000 Binary files a/cloud_controller/vendor/cache/vcap_stager-0.1.14.gem and /dev/null differ diff --git a/stager/Gemfile b/stager/Gemfile index 0d39d9d53..2d788b40c 100644 --- a/stager/Gemfile +++ b/stager/Gemfile @@ -9,6 +9,7 @@ gem 'vcap_common', '~> 1.0.8' gem 'vcap_logging', '>= 0.1.3' gem 'vcap_staging', '~> 0.1.55' gem 'vcap-concurrency', '~> 0.0.1' +gem 'stager-client', '~> 0.0.2' group :test do gem 'rspec' diff --git a/stager/Gemfile.lock b/stager/Gemfile.lock index 355f581fb..dd343ec71 100644 --- a/stager/Gemfile.lock +++ b/stager/Gemfile.lock @@ -30,6 +30,10 @@ GEM sinatra (1.2.6) rack (~> 1.1) tilt (>= 1.2.2, < 2.0) + stager-client (0.0.2) + eventmachine + nats + yajl-ruby thin (1.3.1) daemons (>= 1.0.9) eventmachine (>= 0.12.6) @@ -67,6 +71,7 @@ DEPENDENCIES rake rspec sinatra + stager-client (~> 0.0.2) vcap-concurrency (~> 0.0.1) vcap_common (~> 1.0.8) vcap_logging (>= 0.1.3) diff --git a/stager/lib/vcap/stager/server.rb b/stager/lib/vcap/stager/server.rb index 12052ffeb..4353e76c2 100644 --- a/stager/lib/vcap/stager/server.rb +++ b/stager/lib/vcap/stager/server.rb @@ -61,7 +61,10 @@ def shutdown @shutdown_thread = Thread.new do # Blocks until all threads have finished @thread_pool.shutdown - EM.next_tick { EM.stop } + EM.next_tick do + EM.stop + @logger.info("Shutdown complete") + end end end @@ -88,10 +91,13 @@ def install_signal_handlers def setup_subscriptions @config[:queues].each do |q| - sq = "vcap.stager.#{q}" - @sids << @nats_conn.subscribe(sq, :queue => sq) do |msg, reply_to| - add_task(msg) + @sids << @nats_conn.subscribe(q, :queue => q) do |msg, reply_to| + @thread_pool.enqueue { execute_request(msg, reply_to) } + + @logger.info("Enqueued request #{msg}") end + + @logger.info("Subscribed to #{q}") end end @@ -101,38 +107,37 @@ def teardown_subscriptions @sids = [] end - def add_task(task_msg) + def execute_request(encoded_request, reply_to) begin - @logger.debug("Decoding task '#{task_msg}'") + @logger.debug("Decoding request '#{encoded_request}'") - request = Yajl::Parser.parse(task_msg) + request = Yajl::Parser.parse(encoded_request) rescue => e - @logger.warn("Failed decoding '#{task_msg}': #{e}") + @logger.warn("Failed decoding '#{encoded_request}': #{e}") @logger.warn(e) return end - @thread_pool.enqueue { execute_request(request) } - - @logger.info("Enqueued request #{request}") - - nil - end - - def execute_request(request) task = VCAP::Stager::Task.new(request, @task_config) result = nil begin task.perform - result = VCAP::Stager::TaskResult.new(task.task_id, task.log) + result = { + "task_id" => task.task_id, + "task_log" => task.log, + } @logger.info("Task #{task.task_id} succeeded") rescue VCAP::Stager::TaskError => te @logger.warn("Task #{task.task_id} failed: #{te}") - result = VCAP::Stager::TaskResult.new(task.task_id, task.log, te) + result = { + "task_id" => task.task_id, + "task_log" => task.log, + "error" => te.to_s, + } rescue Exception => e @logger.error("Unexpected exception: #{e}") @logger.error(e) @@ -140,7 +145,9 @@ def execute_request(request) raise e end - EM.next_tick { @nats_conn.publish(request["notify_subj"], result.encode) } + encoded_result = Yajl::Encoder.encode(result) + + EM.next_tick { @nats_conn.publish(reply_to, encoded_result) } nil end diff --git a/stager/lib/vcap/stager/task.rb b/stager/lib/vcap/stager/task.rb index c9683d79c..9d6f7d5ce 100644 --- a/stager/lib/vcap/stager/task.rb +++ b/stager/lib/vcap/stager/task.rb @@ -142,7 +142,7 @@ def stage_app(src_dir, dst_dir, task_logger) if res[:timed_out] emsg = "Staging timed out after #{@max_staging_duration} seconds." else - emsg = "Staging plugin failed: #{res[:stdout]}" + emsg = "Staging plugin failed: #{res[:stderr]}" end task_logger.warn(emsg) diff --git a/stager/spec/functional/stager_spec.rb b/stager/spec/functional/stager_spec.rb index b4b6e6dc6..87ce68031 100644 --- a/stager/spec/functional/stager_spec.rb +++ b/stager/spec/functional/stager_spec.rb @@ -1,6 +1,7 @@ require 'spec_helper' require 'vcap/spec/forked_component/nats_server' +require 'vcap/stager/client' describe VCAP::Stager do before :all do @@ -51,13 +52,11 @@ "properties" => @app_props, "download_uri" => DummyHandler.app_download_uri(@http_server, app_name), "upload_uri" => DummyHandler.droplet_upload_uri(@http_server, app_name), - "notify_subj" => "staging.result", } task_result = wait_for_task_result(@nats_server.uri, request) - task_result.should_not be_nil - task_result.was_success?.should be_true + task_result["error"].should be_nil end end @@ -95,15 +94,24 @@ def start_stager(nats_port, manifest_dir, stager_dir) end def wait_for_task_result(nats_uri, request) - task_result = nil + ret = nil + NATS.start(:uri => nats_uri) do - EM.add_timer(@task_timeout) { NATS.stop } - NATS.subscribe(request["notify_subj"]) do |msg| - task_result = VCAP::Stager::TaskResult.decode(msg) + client = VCAP::Stager::Client::EmAware.new(NATS.client, "staging") + + deferrable = client.stage(request, 30) + + deferrable.callback do |r| + ret = r + NATS.stop + end + + deferrable.errback do |e| + ret = { "error" => e } NATS.stop end - VCAP::Stager::Task.new(request).enqueue('staging') end - task_result + + ret end end diff --git a/stager/vendor/cache/stager-client-0.0.2.gem b/stager/vendor/cache/stager-client-0.0.2.gem new file mode 100644 index 000000000..70af46eb3 Binary files /dev/null and b/stager/vendor/cache/stager-client-0.0.2.gem differ