-
Notifications
You must be signed in to change notification settings - Fork 356
/
pool.rb
106 lines (85 loc) · 3.29 KB
/
pool.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
require 'cloud_controller/dea/nats_messages/dea_advertisment'
require 'cloud_controller/dea/eligible_advertisement_filter'
module VCAP::CloudController
module Dea
class Pool
def initialize(config, message_bus)
@advertise_timeout = config[:dea_advertisement_timeout_in_seconds]
@message_bus = message_bus
@percentage_of_top_stagers = (config[:placement_top_stager_percentage] || 0) / 100.0
@dea_advertisements = {}
@min_candidate_stagers = config[:minimum_candidate_stagers]
end
def register_subscriptions
message_bus.subscribe('dea.advertise') do |msg|
process_advertise_message(msg)
end
message_bus.subscribe('dea.shutdown') do |msg|
process_shutdown_message(msg)
end
end
def process_advertise_message(message)
advertisement = NatsMessages::DeaAdvertisement.new(message, Time.now.utc.to_i + @advertise_timeout)
mutex.synchronize do
@dea_advertisements[advertisement.dea_id] = advertisement
end
end
def process_shutdown_message(message)
fake_advertisement = NatsMessages::DeaAdvertisement.new(message, Time.now.utc.to_i + @advertise_timeout)
mutex.synchronize do
@dea_advertisements.delete(fake_advertisement.dea_id)
end
end
def find_dea(criteria)
mutex.synchronize do
prune_stale_deas
best_dea_ad = EligibleAdvertisementFilter.new(@dea_advertisements, criteria[:app_id]).
only_with_disk(criteria[:disk] || 0).
only_meets_needs(criteria[:mem], criteria[:stack]).
only_in_zone_with_fewest_instances.
only_fewest_instances_of_app.
upper_half_by_memory.
sample
best_dea_ad
end
end
def find_stager(stack, memory, disk)
mutex.synchronize do
validate_stack_availability(stack)
prune_stale_deas
best_ad = top_n_stagers_for(memory, disk, stack).sample
best_ad && best_ad[1]
end
end
def mark_app_started(opts)
dea_id = opts[:dea_id]
app_id = opts[:app_id]
@dea_advertisements[dea_id].increment_instance_count(app_id)
end
def reserve_app_memory(dea_id, app_memory)
@dea_advertisements[dea_id].decrement_memory(app_memory)
end
private
attr_reader :message_bus
def prune_stale_deas
now = Time.now.utc.to_i
@dea_advertisements.delete_if { |_, ad| ad.expired?(now) }
end
def mutex
@mutex ||= Mutex.new
end
def validate_stack_availability(stack)
unless @dea_advertisements.any? { |_, ad| ad.has_stack?(stack) }
raise CloudController::Errors::ApiError.new_from_details('StackNotFound', "The requested app stack #{stack} is not available on this system.")
end
end
def top_n_stagers_for(memory, disk, stack)
@dea_advertisements.select { |id, ad|
ad.meets_needs?(memory, stack) && ad.has_sufficient_disk?(disk)
}.sort_by { |id, ad|
ad.available_memory
}.last([@min_candidate_stagers, @percentage_of_top_stagers * @dea_advertisements.size].max.to_i)
end
end
end
end