Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
add mutex protecting ports and available_memory; other minor updates …
…in preparation for r6 merge

Change-Id: Ib19092e3ca0fd661e9dd13aacc6e6e361e95b9b1
  • Loading branch information
SonicWang committed Nov 11, 2011
1 parent 133f757 commit d0b57a3
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 10 deletions.
3 changes: 2 additions & 1 deletion blob/bin/blob_gateway
Expand Up @@ -18,7 +18,8 @@ class VCAP::Services::Blob::Gateway < VCAP::Services::Base::Gateway
end

def default_config_file
File.join(File.dirname(__FILE__), '..', 'config', 'blob_gateway.yml')
config_base_dir = ENV["CLOUD_FOUNDRY_CONFIG_PATH"] || File.join(File.dirname(__FILE__), '..', 'config')
File.join(config_base_dir, 'blob_gateway.yml')
end

end
Expand Down
3 changes: 2 additions & 1 deletion blob/bin/blob_node
Expand Up @@ -17,7 +17,8 @@ class VCAP::Services::Blob::NodeBin < VCAP::Services::Base::NodeBin
end

def default_config_file
File.join(File.dirname(__FILE__), '..', 'config', 'blob_node.yml')
config_base_dir = ENV["CLOUD_FOUNDRY_CONFIG_PATH"] || File.join(File.dirname(__FILE__), '..', 'config')
File.join(config_base_dir, 'blob_node.yml')
end

def additional_config(options, config)
Expand Down
2 changes: 2 additions & 0 deletions blob/config/blob_gateway.yml
Expand Up @@ -19,3 +19,5 @@ logging:
pid: /var/vcap/sys/run/blob_service.pid
# allow_over_provisioning: false
# z_interval: 30
# check_orphan_interval: 3600
# double_check_orphan_interval: 300
3 changes: 2 additions & 1 deletion blob/config/blob_node.yml
Expand Up @@ -16,4 +16,5 @@ blobd_path: /var/vcap/packages/blob/bin
port_range:
first: 45001
last: 65000
#z_interval: 30
# z_interval: 30
# max_nats_payload: 1048576
47 changes: 40 additions & 7 deletions blob/lib/blob_service/blob_node.rb
Expand Up @@ -100,14 +100,48 @@ def initialize(options)

@free_ports = Set.new
options[:port_range].each {|port| @free_ports << port}
@mutex = Mutex.new
end

def fetch_port(port=nil)
@mutex.synchronize do
port ||= @free_ports.first
raise "port #{port} is already taken!" unless @free_ports.include?(port)
@free_ports.delete(port)
port
end
end

def return_port(port)
@mutex.synchronize do
@free_ports << port
end
end

def delete_port(port)
@mutex.synchronize do
@free_ports.delete(port)
end
end

def inc_memory(memory)
@mutex.synchronize do
@available_memory += memory
end
end

def dec_memory(memory)
@mutex.synchronize do
@available_memory -= memory
end
end

def pre_send_announcement
ProvisionedService.all.each do |provisioned_service|
@free_ports.delete(provisioned_service.port)
delete_port(provisioned_service.port)
if provisioned_service.listening?
@logger.warn("Service #{provisioned_service.name} already listening on port #{provisioned_service.port}")
@available_memory -= (provisioned_service.memory || @max_memory)
dec_memory(provisioned_service.memory || @max_memory)
next
end

Expand Down Expand Up @@ -178,8 +212,7 @@ def all_bindings_list
# will be re-used by restore codes; thus credential could be none null
def provision(plan, credential = nil)
@logger.debug("Provision a service instance")
port = credential && credential['port'] && @free_ports.include?(credential['port']) ? credential['port'] : @free_ports.first
@free_ports.delete(port)
port = credential && credential['port'] ? fetch_port(credential['port']) : fetch_port
name = credential && credential['name'] ? credential['name'] : UUIDTools::UUID.random_create.to_s

username = credential && credential['username'] ? credential['username'] : UUIDTools::UUID.random_create.to_s
Expand Down Expand Up @@ -233,8 +266,8 @@ def cleanup_service(provisioned_service)
FileUtils.rm_rf(dir)
FileUtils.rm_rf(log_dir)
end
@available_memory += provisioned_service.memory
@free_ports << provisioned_service.port
inc_memory(provisioned_service.memory)
return_port(provisioned_service.port)
raise "Could not cleanup service: #{provisioned_service.errors.pretty_inspect}" unless provisioned_service.destroy
true
end
Expand Down Expand Up @@ -339,7 +372,7 @@ def start_instance(provisioned_service)
pid = fork
if pid
@logger.debug("Service #{provisioned_service.name} started with pid #{pid}")
@available_memory -= memory
dec_memory(memory)
# In parent, detach the child.
Process.detach(pid)
pid
Expand Down

0 comments on commit d0b57a3

Please sign in to comment.