Skip to content
This repository has been archived by the owner on Jan 25, 2022. It is now read-only.

Commit

Permalink
Merge "Merging side-by-side-poc branch back into master."
Browse files Browse the repository at this point in the history
  • Loading branch information
Bob Nugmanov authored and Gerrit Code Review committed Aug 17, 2012
2 parents 2676821 + 7a72469 commit 6411e33
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 26 deletions.
7 changes: 6 additions & 1 deletion cloud_controller/app/models/app_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ def update_uris
def updated
once_app_is_staged do
unless app.staging_failed?
NATS.publish('droplet.updated', Yajl::Encoder.encode(:droplet => app.id))
message = {
:droplet => app.id,
:cc_partition => AppConfig[:cc_partition]
}
NATS.publish('droplet.updated', Yajl::Encoder.encode(message))
end
end
end
Expand Down Expand Up @@ -573,6 +577,7 @@ def new_message
data[:limits] = app.limits
data[:env] = app.environment_variables
data[:users] = [app.owner.email] # XXX - should we collect all collabs here?
data[:cc_partition] = AppConfig[:cc_partition]
data
end

Expand Down
2 changes: 1 addition & 1 deletion cloud_controller/app/subscriptions/bulk_api.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
EM.next_tick do
NATS.subscribe('cloudcontroller.bulk.credentials') do |_, reply|
NATS.subscribe("cloudcontroller.bulk.credentials.#{AppConfig[:cc_partition]}") do |_, reply|
NATS.publish(reply, AppConfig[:bulk_api][:auth].to_json)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# if the staged application store is not 'shared' between all
# CloudControllers.

NATS.subscribe('cloudcontrollers.hm.requests', :queue => :cc) do |msg|
NATS.subscribe("cloudcontrollers.hm.requests.#{AppConfig[:cc_partition]}", :queue => :cc) do |msg|
begin
payload = Yajl::Parser.parse(msg, :symbolize_keys => true)
CloudController::UTILITY_FIBER_POOL.spawn do
Expand Down
4 changes: 4 additions & 0 deletions cloud_controller/config/appconfig.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@
AppConfig[:new_initial_placement] = false
end

unless AppConfig.key? :cc_partition
AppConfig[:cc_partition] = "default"
end

#generate bulk api credentials unless they've been explicitly specified (not that they should)
unless AppConfig.key? :bulk_api
AppConfig[:bulk_api] = { :auth =>
Expand Down
4 changes: 4 additions & 0 deletions cloud_controller/config/cloud_controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ allow_debug: true
# Enable new initial placement protocol.
new_initial_placement: true

# allows partitioning cc pool for the purpose of AB testing,
# gradual migration, etc
cc_partition: default

# Supported runtime versions and debug modes.
# Used for /info/runtimes endpoint (served unfiltered as JSON)
runtimes:
Expand Down
4 changes: 4 additions & 0 deletions health_manager/config/health_manager.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ intervals:
#default value is 50.
dequeueing_rate: 50

# allows partitioning the cc pool for the purpose of AB testing,
# gradual migration, etc. Each partition has a dedicated hm instance.
cc_partition: default

# Used for /healthz and /vars endpoints. If not provided random
# values will be generated on component start. Uncomment to use
# static values.
Expand Down
20 changes: 11 additions & 9 deletions health_manager/lib/health_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def initialize(config)

@spindown_inactive_apps = @inactivity_period_for_spindown > 0

@cc_partition = config['cc_partition'] || "default"

@droplets = {}
@pending_restart = {}
@request_queue = VCAP::PrioritySet.new
Expand Down Expand Up @@ -480,15 +482,16 @@ def elapsed_time_in_ms(start)
end

def process_updated_message(message)
message = parse_json(message)
return unless message['cc_partition'] == @cc_partition
VCAP::Component.varz[:droplet_updated_msgs_received] += 1
droplet_id = parse_json(message)['droplet']
ensure_connected { update_droplet App.find_by_id(droplet_id) }
ensure_connected { update_droplet App.find_by_id(message['droplet']) }
end

def process_exited_message(message)
VCAP::Component.varz[:droplet_exited_msgs_received] += 1

exit_message = parse_json(message)
return unless exit_message['cc_partition'] == @cc_partition
VCAP::Component.varz[:droplet_exited_msgs_received] += 1
droplet_id = exit_message['droplet']
version = exit_message['version']
index = exit_message['index']
Expand Down Expand Up @@ -590,6 +593,7 @@ def process_heartbeat_message(message)
parsed_message = parse_json(message)
dea_prod = parsed_message['prod']
parsed_message['droplets'].each do |heartbeat|
next unless heartbeat['cc_partition'] == @cc_partition
droplet_id = heartbeat['droplet']
instance = heartbeat['instance']
droplet_entry = @droplets[droplet_id]
Expand Down Expand Up @@ -636,8 +640,6 @@ def process_active_apps_message(message)
droplet_entry = @droplets[app_id]
if droplet_entry
droplet_entry[:last_activity] = now
else
@logger.warn("Droplet went away but is still showing activity, app_id=#{app_id}")
end
end
end
Expand Down Expand Up @@ -826,7 +828,7 @@ def publish_start_message(start_message)
end

@logger.info("Requesting the start of missing instances: #{start_message}")
NATS.publish('cloudcontrollers.hm.requests', encode_json(start_message))
NATS.publish("cloudcontrollers.hm.requests.#{@cc_partition}", encode_json(start_message))
end

def queue_request(message, high_priority)
Expand All @@ -849,7 +851,7 @@ def stop_instances(droplet_id, instances)
:last_updated => last_updated,
:instances => instances
})
NATS.publish('cloudcontrollers.hm.requests', stop_message)
NATS.publish("cloudcontrollers.hm.requests.#{@cc_partition}", stop_message)
@logger.info("Requesting the stop of extra instances: #{stop_message}")
end

Expand All @@ -861,7 +863,7 @@ def spindown(droplet_id)
:op => :SPINDOWN
})

NATS.publish('cloudcontrollers.hm.requests',message)
NATS.publish("cloudcontrollers.hm.requests.#{@cc_partition}",message)
end

def configure_timers
Expand Down
7 changes: 4 additions & 3 deletions health_manager/spec/functional/health_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
#test start missing instances
it 'should start missing instances' do
app = nil
msg = receive_message 'cloudcontrollers.hm.requests' do
msg = receive_message 'cloudcontrollers.hm.requests.default' do
#putting enough into Expected State to trigger an instance START request
app = @helper.make_app_with_owner_and_instance(
make_app_def( 'to_be_started_app'),
Expand All @@ -111,7 +111,7 @@

it 'should not start missing instances with empty staged_package_hash' do
app = nil
msg = receive_message('cloudcontrollers.hm.requests', false) do
msg = receive_message('cloudcontrollers.hm.requests.default', false) do

app_def = make_app_def('non_sane_app')
app_def.delete(:staged_package_hash)
Expand All @@ -127,13 +127,14 @@
app = @helper.make_app_with_owner_and_instance(make_app_def('crasher'), make_user_def)
crash_msg = {
'droplet' => app.id,
'cc_partition' => "default",
'version' => 0,
'instance' => 0,
'index' => 0,
'reason' => 'CRASHED',
'crash_timestamp' => Time.now.to_i
}
msg = receive_message 'cloudcontrollers.hm.requests' do
msg = receive_message 'cloudcontrollers.hm.requests.default' do
NATS.publish('droplet.exited', crash_msg.to_json)
end
msg.should_not be_nil
Expand Down
29 changes: 18 additions & 11 deletions health_manager/spec/health_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ def make_heartbeat_message(options = {})
droplets = []
indices.each do |index|
droplets << {
'droplet' => @app.id, 'index' => index, 'instance' => "badbeef-#{index}", 'state' => 'RUNNING',
'version' => @droplet_entry[:live_version], 'state_timestamp' => @droplet_entry[:last_updated]
'droplet' => @app.id,
'cc_partition' => "default",
'index' => index,
'instance' => "badbeef-#{index}",
'state' => 'RUNNING',
'version' => @droplet_entry[:live_version],
'state_timestamp' => @droplet_entry[:last_updated]
}.merge(options)
end
{ 'droplets' => droplets }
Expand All @@ -133,6 +138,7 @@ def make_heartbeat_message(options = {})
def make_crashed_message(options={})
{
'droplet' => @app.id,
'cc_partition' => "default",
'version' => "#{@app.staged_package_hash}-#{@app.run_count}",
'index' => 0,
'instance' => "badbeef-0",
Expand Down Expand Up @@ -167,7 +173,7 @@ def get_live_index(droplet_entry,index)

it "should detect instances that are down and send a START request" do
stats = { :frameworks => {}, :runtimes => {}, :down => 0 }
should_publish_to_nats "cloudcontrollers.hm.requests", {
should_publish_to_nats "cloudcontrollers.hm.requests.default", {
'droplet' => @app.id,
'op' => 'START',
'last_updated' => @app.last_updated.to_i,
Expand All @@ -192,7 +198,7 @@ def get_live_index(droplet_entry,index)
2 => { :state => 'RUNNING', :timestamp => timestamp, :last_action => @app.last_updated, :instance => '2' },
3 => { :state => 'RUNNING', :timestamp => timestamp, :last_action => @app.last_updated, :instance => '3' }
}}
should_publish_to_nats "cloudcontrollers.hm.requests", {
should_publish_to_nats "cloudcontrollers.hm.requests.default", {
'droplet' => @app.id,
'op' => 'STOP',
'last_updated' => @app.last_updated.to_i,
Expand Down Expand Up @@ -238,7 +244,7 @@ def activity_message
end

it "should spindown app with no acitvity at all" do
should_publish_to_nats('cloudcontrollers.hm.requests', {
should_publish_to_nats('cloudcontrollers.hm.requests.default', {
:droplet => @app.id,
:op => :SPINDOWN
})
Expand Down Expand Up @@ -267,7 +273,7 @@ def activity_message
end

it "should spindown an app with stale activity" do
should_publish_to_nats('cloudcontrollers.hm.requests', {
should_publish_to_nats('cloudcontrollers.hm.requests.default', {
:droplet => @app.id,
:op => :SPINDOWN
})
Expand Down Expand Up @@ -332,15 +338,15 @@ def activity_message
'last_updated' => stoppee_instance[:timestamp],
'instances' => [stoppee_instance[:instance]]
}
should_publish_to_nats("cloudcontrollers.hm.requests", stop_message)
should_publish_to_nats("cloudcontrollers.hm.requests", make_restart_message('indices'=>[1]))
should_publish_to_nats("cloudcontrollers.hm.requests.default", stop_message)
should_publish_to_nats("cloudcontrollers.hm.requests.default", make_restart_message('indices'=>[1]))

@hm.analyze_app(@app.id, @droplet_entry, stats)
@hm.deque_a_batch_of_requests
end

def ensure_non_flapping_restart
should_publish_to_nats "cloudcontrollers.hm.requests", make_restart_message
should_publish_to_nats "cloudcontrollers.hm.requests.default", make_restart_message
@hm.process_heartbeat_message(make_heartbeat_message.to_json)
droplet_entry = @hm.process_exited_message(make_crashed_message.to_json)
@hm.deque_a_batch_of_requests
Expand All @@ -350,7 +356,7 @@ def ensure_non_flapping_restart

def ensure_flapping_delayed_restart(delay)
in_em_with_fiber do |f|
should_publish_to_nats "cloudcontrollers.hm.requests", make_restart_message('flapping' => true)
should_publish_to_nats "cloudcontrollers.hm.requests.default", make_restart_message('flapping' => true)

@hm.process_heartbeat_message(make_heartbeat_message.to_json)
droplet_entry = @hm.process_exited_message(make_crashed_message.to_json)
Expand Down Expand Up @@ -433,7 +439,7 @@ def in_em(timeout = 10)

apps.each do |app|

should_publish_to_nats("cloudcontrollers.hm.requests", {
should_publish_to_nats("cloudcontrollers.hm.requests.default", {
'droplet' => app.id ,
'op' => 'START',
'last_updated' => app.last_updated.to_i,
Expand All @@ -447,6 +453,7 @@ def in_em(timeout = 10)
apps.each do |app|
@hm.process_exited_message({
'droplet' => app.id,
'cc_partition' => "default",
'version' => "#{app.staged_package_hash}-#{app.run_count}",
'index' => 0,
'instance' => 0,
Expand Down

0 comments on commit 6411e33

Please sign in to comment.