Permalink
Browse files

Final stager changes

- Rewrite stager to use NATS queue groups instead of Resque.
- Add support for staging uploads to nginx config
- Integrate CC with stager
- Add support to bin/vcap for controlling stager/redis instance

Test plan:
* Ran BVTs locally with and w/out the new stager, and with and w/out nginx support.
* Ran BVTs against my deployment with and without the new stager.
* Ran unit tests for stager and CC.

Change-Id: I3c85d7de58f518d6111ca3dbf6c55fc532350d9d
  • Loading branch information...
1 parent 40a43e3 commit 474deed6021c67e2db0cf27a441f5a16b5bed009 mpage committed Aug 12, 2011
View
@@ -10,9 +10,11 @@ gem 'logging', '>= 1.5.0'
# VCAP common components
gem 'vcap_common', :require => ['vcap/common', 'vcap/component'], :path => '../common'
gem 'vcap_logging', :require => ['vcap/logging']
+gem 'vcap_staging', '= 0.1.2'
-# XXX - Vendor once working
-gem 'vcap_staging'
+# For queuing staging tasks
+gem 'em-hiredis'
+gem 'vcap_stager', '= 0.1.3'
# Databases
gem 'sqlite3'
@@ -45,6 +47,7 @@ gem 'bcrypt-ruby', '>= 2.1.4'
gem 'ruby-hmac', :require => 'hmac-sha1'
gem 'SystemTimer', :platforms => :mri_18
gem 'uuidtools'
+gem 'rest-client', '= 1.6.7'
# rspec-rails is outside the 'test' group in order to consistently provide Rake tasks.
gem 'rspec-rails', '>= 2.4.1'
@@ -48,6 +48,8 @@ GEM
builder (>= 2.1.2)
daemons (1.1.2)
diff-lcs (1.1.2)
+ em-hiredis (0.1.0)
+ hiredis (~> 0.3.0)
em-http-request (1.0.0.beta.3)
addressable (>= 2.2.3)
em-socksify
@@ -60,6 +62,7 @@ GEM
erubis (2.6.6)
abstract (>= 1.0.0)
eventmachine (0.12.10)
+ hiredis (0.3.2)
http_parser.rb (0.5.1)
i18n (0.5.0)
json_pure (1.5.1)
@@ -103,6 +106,8 @@ GEM
thor (~> 0.14.4)
rake (0.8.7)
rcov (0.9.9)
+ rest-client (1.6.7)
+ mime-types (>= 1.16)
rspec (2.5.0)
rspec-core (~> 2.5.0)
rspec-expectations (~> 2.5.0)
@@ -132,7 +137,13 @@ GEM
tzinfo (0.3.26)
uuidtools (2.1.2)
vcap_logging (0.1.0)
- vcap_staging (0.1.0)
+ vcap_stager (0.1.3)
+ vcap_staging (0.1.2)
+ nokogiri (>= 1.4.4)
+ rake
+ rspec
+ vcap_common
+ yajl-ruby (>= 0.7.9)
yajl-ruby (0.8.2)
PLATFORMS
@@ -142,6 +153,7 @@ DEPENDENCIES
SystemTimer
bcrypt-ruby (>= 2.1.4)
ci_reporter
+ em-hiredis
em-http-request (~> 1.0.0.beta.3)
em-redis
eventmachine (~> 0.12.10)
@@ -154,6 +166,7 @@ DEPENDENCIES
rack-fiber_pool
rails (~> 3.0.5)
rcov
+ rest-client (= 1.6.7)
rspec (>= 2.4.0)
rspec-rails (>= 2.4.1)
ruby-hmac
@@ -163,5 +176,6 @@ DEPENDENCIES
uuidtools
vcap_common!
vcap_logging
- vcap_staging
+ vcap_stager (= 0.1.3)
+ vcap_staging (= 0.1.2)
yajl-ruby (>= 0.7.9)
@@ -1,3 +1,5 @@
+require 'staging_task_manager'
+
class AppsController < ApplicationController
before_filter :require_user, :except => [:download_staged]
before_filter :find_app_by_name, :except => [:create, :list, :download_staged]
@@ -83,7 +85,7 @@ def upload
end
def download
- path = @app.package_path
+ path = @app.unstaged_package_path
if path && File.exists?(path)
send_file path
else
@@ -124,7 +126,13 @@ def start_update
error_on_lock_mismatch(@app)
@app.lock_version += 1
manager = AppManager.new(@app)
- manager.stage if @app.needs_staging?
+ if @app.needs_staging?
+ if user.uses_new_stager?
+ stage_app(@app)
+ else
+ manager.stage
+ end
+ end
manager.stop_all
manager.started
render :nothing => true, :status => 204
@@ -164,6 +172,18 @@ def check_update
# GET /apps/:name/instances/:instance_id/files/:path'
def files
+ # XXX - Yuck. This will have to do until we update VMC with a real
+ # way to fetch staging logs.
+ if user.uses_new_stager? && (params[:path] == 'logs/staging.log')
+ log = StagingTaskLog.fetch_fibered(@app.id)
+ if log
+ render :text => log.task_log
+ else
+ render :nothing => true, :status => 404
+ end
+ return
+ end
+
# will Fiber.yield
url, auth = AppManager.new(@app).get_file_url(params[:instance_id], params[:path])
raise CloudError.new(CloudError::APP_FILE_ERROR, params[:path] || '/') unless url
@@ -191,6 +211,43 @@ def files
private
+ def stage_app(app)
+ task_mgr = StagingTaskManager.new(:logger => CloudController.logger,
+ :timeout => AppConfig[:staging][:max_staging_runtime])
+ dl_uri = StagingController.download_app_uri(app)
+ ul_hdl = StagingController.create_upload(app)
+
+ result = task_mgr.run_staging_task(app, dl_uri, ul_hdl.upload_uri)
+
+ # Update run count to be consistent with previous staging code
+ if result.was_success?
+ CloudController.logger.debug("Staging task for app_id=#{app.id} succeded.", :tags => [:staging])
+ CloudController.logger.debug1("Details: #{result.task_log}", :tags => [:staging])
+ app.update_staged_package(ul_hdl.upload_path)
+ app.package_state = 'STAGED'
+ app.update_run_count()
+ else
+ CloudController.logger.warn("Staging task for app_id=#{app.id} failed: #{result.error}",
+ :tags => [:staging])
+ CloudController.logger.debug1("Details: #{result.task_log}", :tags => [:staging])
+ raise CloudError.new(CloudError::APP_STAGING_ERROR, result.error.to_s)
+ end
+
+ rescue => e
+ # It may be the case that upload from the stager will happen sometime in the future.
+ # Mark the upload as completed so that any upload that occurs in the future will fail.
+ if ul_hdl
+ StagingController.complete_upload(ul_hdl)
+ FileUtils.rm_f(ul_hdl.upload_path)
+ end
+ app.package_state = 'FAILED'
+ app.update_run_count()
+ raise e
+
+ ensure
+ app.save!
+ end
+
def find_app_by_name
# XXX - What do we want semantics to be like for multiple apps w/ same name (possible w/ contribs)
@app = user.apps_owned.find_by_name(params[:name])
@@ -208,7 +265,6 @@ def find_app_by_name
# App from the request params and makes the necessary AppManager calls.
def update_app_from_params(app)
CloudController.logger.debug "app: #{app.id || "nil"} update_from_parms"
-
error_on_lock_mismatch(app)
app.lock_version += 1
@@ -247,7 +303,14 @@ def update_app_from_params(app)
# Process any changes that require action on out part here.
manager = AppManager.new(app)
- manager.stage if app.needs_staging?
+
+ if app.needs_staging?
+ if user.uses_new_stager?
+ stage_app(app)
+ else
+ manager.stage
+ end
+ end
if changed.include?('state')
if app.stopped?
@@ -405,4 +468,6 @@ def check_has_capacity_for?(app, previous_state)
raise CloudError.new(CloudError::ACCOUNT_NOT_ENOUGH_MEMORY, "#{mem_quota}M")
end
end
+
+
end
@@ -0,0 +1,137 @@
+require 'uri'
+
+# Handles app downloads and droplet uploads from the stagers.
+#
+class StagingController < ApplicationController
+ skip_before_filter :fetch_user_from_token
+ before_filter :authenticate_stager
+
+ class DropletUploadHandle
+ attr_reader :upload_id, :upload_path, :upload_uri, :app
+
+ def initialize(app)
+ @app = app
+ @upload_id = VCAP.secure_uuid
+ @upload_path = File.join(AppConfig[:directories][:tmpdir],
+ "staged_upload_#{app.id}_#{@upload_id}.tgz")
+ @upload_uri = StagingController.upload_droplet_uri(app, @upload_id)
+ end
+ end
+
+ class << self
+ def upload_droplet_uri(app, upload_id)
+ staging_uri("/staging/droplet/#{app.id}/#{upload_id}")
+ end
+
+ def download_app_uri(app)
+ staging_uri("/staging/app/#{app.id}")
+ end
+
+ def create_upload(app)
+ @uploads ||= {}
+ ret = DropletUploadHandle.new(app)
+ @uploads[ret.upload_id] = ret
+ ret
+ end
+
+ def lookup_upload(upload_id)
+ @uploads ||= {}
+ @uploads[upload_id]
+ end
+
+ def complete_upload(handle)
+ return unless @uploads
+ @uploads.delete(handle.upload_id)
+ end
+
+ private
+
+ def staging_uri(path)
+ uri = URI::HTTP.build(
+ :host => CloudController.bind_address,
+ :port => CloudController.external_port,
+ :userinfo => [AppConfig[:staging][:auth][:user], AppConfig[:staging][:auth][:password]],
+ :path => path
+ )
+ uri.to_s
+ end
+ end
+
+ # Handles a droplet upload from a stager
+ def upload_droplet
+ upload = nil
+ src_path = nil
+ app = App.find_by_id(params[:id])
+ raise CloudError.new(CloudError::APP_NOT_FOUND) unless app
+
+ upload = self.class.lookup_upload(params[:upload_id])
+ unless upload
+ CloudController.logger.error("No upload set for upload_id=#{params[:upload_id]}")
+ raise CloudError.new(CloudError::BAD_REQUEST)
+ end
+
+ if CloudController.use_nginx
+ src_path = params[:droplet_path]
+ else
+ src_path = params[:upload][:droplet].path
+ end
+ unless src_path && File.exist?(src_path)
+ CloudController.logger.error("Uploaded droplet not found at '#{src_path}'")
+ raise CloudError.new(CloudError::BAD_REQUEST)
+ end
+
+ begin
+ CloudController.logger.debug("Renaming staged droplet from '#{src_path}' to '#{upload.upload_path}'")
+ File.rename(src_path, upload.upload_path)
+ rescue => e
+ CloudController.logger.error("Failed uploading staged droplet: #{e}", :tags => [:staging])
+ CloudController.logger.error(e)
+ FileUtils.rm_f(upload.upload_path)
+ raise e
+ end
+ CloudController.logger.debug("Stager (#{request.remote_ip}) uploaded droplet to #{upload.upload_path}",
+ :tags => [:staging])
+ render :nothing => true, :status => 200
+ ensure
+ FileUtils.rm_f(src_path) if src_path
+ self.class.complete_upload(upload) if upload
+ end
+
+ # Handles an app download from a stager
+ def download_app
+ app = App.find_by_id(params[:id])
+ raise CloudError.new(CloudError::APP_NOT_FOUND) unless app
+
+ path = app.unstaged_package_path
+ unless path && File.exists?(path)
+ CloudController.logger.error("Couldn't find package path for app_id=#{app.id} (stager=#{request.remote_ip})", :tags => [:staging])
+ raise CloudError.new(CloudError::APP_NOT_FOUND)
+ end
+ CloudController.logger.debug("Stager (#{request.remote_ip}) requested app_id=#{app.id} @ path=#{path}", :tags => [:staging])
+
+ if path && File.exists?(path)
+ if CloudController.use_nginx
+ response.headers['X-Accel-Redirect'] = '/droplets/' + File.basename(path)
+ render :nothing => true, :status => 200
+ else
+ send_file path
+ end
+ else
+ raise CloudError.new(CloudError::APP_NOT_FOUND)
+ end
+ end
+
+ private
+
+ def authenticate_stager
+ authenticate_or_request_with_http_basic do |user, pass|
+ if (user == AppConfig[:staging][:auth][:user]) && (pass == AppConfig[:staging][:auth][:password])
+ true
+ else
+ CloudController.logger.error("Stager auth failed (user=#{user}, pass=#{pass} from #{request.remote_ip}", :tags => [:auth_failure, :staging])
+ false
+ end
+ end
+ end
+
+end
@@ -123,6 +123,15 @@ def staging_environment_data
:resources => resource_requirements }
end
+ def staging_task_properties
+ services = service_bindings(true).map {|sb| sb.for_staging}
+ { :services => services,
+ :framework => framework,
+ :runtime => runtime,
+ :resources => resource_requirements,
+ :environment => environment}
+ end
+
# Returns an array of the URLs that point to this application
def mapped_urls
routes.active.map {|r| r.url}.sort
@@ -509,6 +518,19 @@ def remove_collaborator(user)
collab.destroy if collab
end
+ def update_staged_package(upload_path)
+ self.staged_package_hash = Digest::SHA1.file(upload_path).hexdigest
+ FileUtils.mv(upload_path, self.staged_package_path)
+ end
+
+ def update_run_count
+ if self..staged_package_hash_changed?
+ self.run_count = 0 # reset
+ else
+ self.run_count += 1
+ end
+ end
+
private
# TODO - Remove this when the VMC client has been updated to match our new strings.
@@ -1,3 +1,5 @@
+require 'vcap/stager'
+
class AppManager
attr_reader :app
Oops, something went wrong.

0 comments on commit 474deed

Please sign in to comment.