Permalink
Browse files

Decouple Stager and CloudController

CC uses StagerClient

Change-Id: I76e60c142af0fece68a34354f8919626e16145b9
  • Loading branch information...
1 parent b07d8d3 commit 30927e0d37da6837409f9866d7334384b8f2109b mpage committed May 2, 2012
View
@@ -13,7 +13,7 @@ gem 'cf-uaa-client', '>= 0.0.8'
# For queuing staging tasks
gem 'em-hiredis'
-gem 'vcap_stager', '~> 0.1.14'
+gem 'stager-client', '~> 0.0.2'
# Databases
gem 'sqlite3'
@@ -121,6 +121,10 @@ GEM
rack (~> 1.1)
tilt (>= 1.2.2, < 2.0)
sqlite3 (1.3.3)
+ stager-client (0.0.2)
+ eventmachine
+ nats
+ yajl-ruby
thin (1.3.1)
daemons (>= 1.0.9)
eventmachine (>= 0.12.6)
@@ -138,7 +142,6 @@ GEM
thin (~> 1.3.1)
yajl-ruby (~> 0.8.3)
vcap_logging (1.0.1)
- vcap_stager (0.1.14)
vcap_staging (0.1.55)
nokogiri (>= 1.4.4)
rake
@@ -175,10 +178,10 @@ DEPENDENCIES
ruby-hmac (~> 0.4.0)
sinatra
sqlite3
+ stager-client (~> 0.0.2)
thin
uuidtools (~> 2.1.2)
vcap_common
vcap_logging
- vcap_stager (~> 0.1.14)
vcap_staging (~> 0.1.55)
yajl-ruby (~> 0.8.3)
@@ -1,4 +1,4 @@
-require 'staging_task_manager'
+require "vcap/stager/client"
class AppsController < ApplicationController
before_filter :require_user, :except => [:download_staged]
@@ -219,25 +219,36 @@ def files
private
def stage_app(app)
- task_mgr = StagingTaskManager.new(:logger => CloudController.logger,
- :timeout => AppConfig[:staging][:max_staging_runtime])
dl_uri = StagingController.download_app_uri(app)
ul_hdl = StagingController.create_upload(app)
- result = task_mgr.run_staging_task(app, dl_uri, ul_hdl.upload_uri)
+ client = VCAP::Stager::Client::FiberAware.new(NATS.client,
+ AppConfig[:staging][:queue])
- # Update run count to be consistent with previous staging code
- if result.was_success?
- CloudController.logger.debug("Staging task for app_id=#{app.id} succeded.", :tags => [:staging])
- CloudController.logger.debug1("Details: #{result.task_log}", :tags => [:staging])
+ request = {
+ "app_id" => app.id,
+ "properties" => app.staging_task_properties,
+ "download_uri" => dl_uri,
+ "upload_uri" => ul_hdl.upload_uri,
+ }
+
+ begin
+ result = client.stage(request, AppConfig[:staging][:max_staging_runtime])
+ StagingTaskLog.new(app.id, result["task_log"]).save
+ rescue VCAP::Stager::Client::Error => e
+ result = { "error" => e.to_s }
+ end
+
+ if result["error"]
+ CloudController.logger.warn("Staging for app_id=#{app.id} failed")
+ CloudController.logger.warn("Error: #{result["error"]}")
+ raise CloudError.new(CloudError::APP_STAGING_ERROR, result["error"])
+ else
+ # Update run count to be consistent with previous staging code
+ CloudController.logger.info("Staging for app_id=#{app.id} succeeded")
app.update_staged_package(ul_hdl.upload_path)
app.package_state = 'STAGED'
app.update_run_count()
- else
- CloudController.logger.warn("Staging task for app_id=#{app.id} failed: #{result.error}",
- :tags => [:staging])
- CloudController.logger.debug1("Details: #{result.task_log}", :tags => [:staging])
- raise CloudError.new(CloudError::APP_STAGING_ERROR, result.error.to_s)
end
rescue => e
@@ -1,5 +1,3 @@
-require 'vcap/stager'
-
class AppManager
attr_reader :app
@@ -17,7 +17,7 @@ def fetch_fibered(app_id, redis=nil, timeout=5)
get_def = redis.get(key)
get_def.timeout(timeout)
get_def.errback do |e|
- e = VCAP::Stager::StagingTimeoutError.new("Timed out fetching result") if e == nil
+ e = CloudError.new(CloudError::SYSTEM_ERROR, "Timed out fetching result") if e == nil
logger.error("Failed fetching result for key '#{key}': #{e}")
logger.error(e)
f.resume([false, e])
@@ -205,6 +205,8 @@
exit 1
end
+AppConfig[:staging][:queue] ||= "staging"
+
if AppConfig[:bootstrap_users]
unless AppConfig[:bootstrap_users].kind_of?(Array)
$stderr.puts "List of bootstrap users must be an array"
@@ -1,57 +0,0 @@
-require 'nats/client'
-
-require 'vcap/stager/task'
-require 'vcap/stager/task_result'
-
-class StagingTaskManager
- DEFAULT_TASK_TIMEOUT = 120
-
- def initialize(opts={})
- @logger = opts[:logger] || VCAP::Logging.logger('vcap.cc.staging_manager')
- @nats_conn = opts[:nats_conn] || NATS.client
- @timeout = opts[:timeout] || DEFAULT_TASK_TIMEOUT
- end
-
- # Enqueues a staging task in redis, blocks until task completes or a timeout occurs.
- #
- # @param app App The app to be staged
- # @param dl_uri String URI that the stager can download the app from
- # @param ul_uri String URI that the stager should upload the staged droplet to
- #
- # @return VCAP::Stager::TaskResult
- def run_staging_task(app, dl_uri, ul_uri)
- inbox = "cc.staging." + VCAP.secure_uuid
- f = Fiber.current
-
- # Wait for notification from the stager
- exp_timer = nil
- sid = @nats_conn.subscribe(inbox) do |msg|
- @logger.debug("Received result from stager on '#{inbox}' : '#{msg}'")
- @nats_conn.unsubscribe(sid)
- EM.cancel_timer(exp_timer)
- f.resume(msg)
- end
-
- # Setup timer to expire our request if we don't hear a response from the stager in time
- exp_timer = EM.add_timer(@timeout) do
- @logger.warn("Staging timed out for app_id=#{app.id} (timeout=#{@timeout}, unsubscribing from '#{inbox}'",
- :tags => [:staging])
- @nats_conn.unsubscribe(sid)
- f.resume(nil)
- end
-
- task = VCAP::Stager::Task.new(app.id, app.staging_task_properties, dl_uri, ul_uri, inbox)
- task.enqueue('staging')
- @logger.debug("Enqeued staging task for app_id=#{app.id}.", :tags => [:staging])
-
- reply = Fiber.yield
- if reply
- result = VCAP::Stager::TaskResult.decode(reply)
- StagingTaskLog.new(app.id, result.task_log).save
- else
- result = VCAP::Stager::TaskResult.new(nil, nil, "Timed out waiting for stager's reply.")
- end
-
- result
- end
-end
@@ -58,7 +58,7 @@
Fiber.new do
expect do
res = StagingTaskLog.fetch_fibered(@task_id, @redis_mock)
- end.to raise_error(VCAP::Stager::TaskError)
+ end.to raise_error(CloudError)
end.resume
@deferrable_mock.fail(nil)
end
@@ -1,46 +0,0 @@
-require 'spec_helper'
-
-describe StagingTaskManager do
- before :all do
- # Prevent EM/NATS related initializers from running
- EM.instance_variable_set(:@next_tick_queue, [])
- end
-
- describe '#run_staging_task' do
- it 'should expire long running tasks' do
- nats_conn = mock()
- logger = stub_everything(:logger)
-
- nats_conn.expects(:subscribe).with(any_parameters())
- nats_conn.expects(:unsubscribe).with(any_parameters())
-
- task = stub_everything(:task)
- VCAP::Stager::Task.expects(:new).with(any_parameters()).returns(task)
-
- app = create_stub_app(12345)
- stm = StagingTaskManager.new(
- :logger => logger,
- :nats_conn => nats_conn,
- :timeout => 1
- )
-
- res = nil
- EM.run do
- Fiber.new do
- EM.add_timer(2) { EM.stop }
- res = stm.run_staging_task(app, nil, nil)
- end.resume
- end
-
- res.should be_instance_of(VCAP::Stager::TaskResult)
- res.was_success?.should be_false
- end
- end
-
- def create_stub_app(id, props={})
- ret = mock("app_#{id}")
- ret.stubs(:id).returns(id)
- ret.stubs(:staging_task_properties).returns(props)
- ret
- end
-end
Binary file not shown.
Binary file not shown.
View
@@ -9,6 +9,7 @@ gem 'vcap_common', '~> 1.0.8'
gem 'vcap_logging', '>= 0.1.3'
gem 'vcap_staging', '~> 0.1.55'
gem 'vcap-concurrency', '~> 0.0.1'
+gem 'stager-client', '~> 0.0.2'
group :test do
gem 'rspec'
View
@@ -30,6 +30,10 @@ GEM
sinatra (1.2.6)
rack (~> 1.1)
tilt (>= 1.2.2, < 2.0)
+ stager-client (0.0.2)
+ eventmachine
+ nats
+ yajl-ruby
thin (1.3.1)
daemons (>= 1.0.9)
eventmachine (>= 0.12.6)
@@ -67,6 +71,7 @@ DEPENDENCIES
rake
rspec
sinatra
+ stager-client (~> 0.0.2)
vcap-concurrency (~> 0.0.1)
vcap_common (~> 1.0.8)
vcap_logging (>= 0.1.3)
@@ -61,7 +61,10 @@ def shutdown
@shutdown_thread = Thread.new do
# Blocks until all threads have finished
@thread_pool.shutdown
- EM.next_tick { EM.stop }
+ EM.next_tick do
+ EM.stop
+ @logger.info("Shutdown complete")
+ end
end
end
@@ -88,10 +91,13 @@ def install_signal_handlers
def setup_subscriptions
@config[:queues].each do |q|
- sq = "vcap.stager.#{q}"
- @sids << @nats_conn.subscribe(sq, :queue => sq) do |msg, reply_to|
- add_task(msg)
+ @sids << @nats_conn.subscribe(q, :queue => q) do |msg, reply_to|
+ @thread_pool.enqueue { execute_request(msg, reply_to) }
+
+ @logger.info("Enqueued request #{msg}")
end
+
+ @logger.info("Subscribed to #{q}")
end
end
@@ -101,46 +107,47 @@ def teardown_subscriptions
@sids = []
end
- def add_task(task_msg)
+ def execute_request(encoded_request, reply_to)
begin
- @logger.debug("Decoding task '#{task_msg}'")
+ @logger.debug("Decoding request '#{encoded_request}'")
- request = Yajl::Parser.parse(task_msg)
+ request = Yajl::Parser.parse(encoded_request)
rescue => e
- @logger.warn("Failed decoding '#{task_msg}': #{e}")
+ @logger.warn("Failed decoding '#{encoded_request}': #{e}")
@logger.warn(e)
return
end
- @thread_pool.enqueue { execute_request(request) }
-
- @logger.info("Enqueued request #{request}")
-
- nil
- end
-
- def execute_request(request)
task = VCAP::Stager::Task.new(request, @task_config)
result = nil
begin
task.perform
- result = VCAP::Stager::TaskResult.new(task.task_id, task.log)
+ result = {
+ "task_id" => task.task_id,
+ "task_log" => task.log,
+ }
@logger.info("Task #{task.task_id} succeeded")
rescue VCAP::Stager::TaskError => te
@logger.warn("Task #{task.task_id} failed: #{te}")
- result = VCAP::Stager::TaskResult.new(task.task_id, task.log, te)
+ result = {
+ "task_id" => task.task_id,
+ "task_log" => task.log,
+ "error" => te.to_s,
+ }
rescue Exception => e
@logger.error("Unexpected exception: #{e}")
@logger.error(e)
raise e
end
- EM.next_tick { @nats_conn.publish(request["notify_subj"], result.encode) }
+ encoded_result = Yajl::Encoder.encode(result)
+
+ EM.next_tick { @nats_conn.publish(reply_to, encoded_result) }
nil
end
@@ -142,7 +142,7 @@ def stage_app(src_dir, dst_dir, task_logger)
if res[:timed_out]
emsg = "Staging timed out after #{@max_staging_duration} seconds."
else
- emsg = "Staging plugin failed: #{res[:stdout]}"
+ emsg = "Staging plugin failed: #{res[:stderr]}"
end
task_logger.warn(emsg)
Oops, something went wrong.

0 comments on commit 30927e0

Please sign in to comment.