Skip to content

Commit

Permalink
Improve VM launch paralelism. Fix #233
Browse files Browse the repository at this point in the history
  • Loading branch information
micafer committed Feb 27, 2017
1 parent 1d1f2ad commit 6548136
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 81 deletions.
179 changes: 98 additions & 81 deletions IM/InfrastructureManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from IM.config import Config
from IM.VirtualMachine import VirtualMachine
from IM.InfrastructureInfo import InfrastructureInfo

if Config.MAX_SIMULTANEOUS_LAUNCHES > 1:
from multiprocessing.pool import ThreadPool
Expand Down Expand Up @@ -143,89 +144,119 @@ def root(n):
return deploy_groups

@staticmethod
def _launch_group(sel_inf, deploy_group, deploys_group_cloud_list, cloud_list, concrete_systems,
radl, auth, deployed_vm, cancel_deployment):
"""Launch a group of deploys together."""

if not deploy_group:
InfrastructureManager.logger.warning("No VMs to deploy!")
return
if not deploys_group_cloud_list:
cancel_deployment.append(Exception("No cloud provider available"))
return
def _launch_vm(sel_inf, task, cloud_id, deploy_group, auth,
deployed_vm, cancel_deployment, exceptions):
fail_cont = 0
all_ok = False
exceptions = []
for cloud_id in deploys_group_cloud_list:
cloud = cloud_list[cloud_id]
all_ok = True
for deploy in deploy_group:
remain_vm, fail_cont = deploy.vm_number, 0
while (remain_vm > 0 and fail_cont < Config.MAX_VM_FAILS and
not cancel_deployment):
concrete_system = concrete_systems[cloud_id][deploy.id][0]
if not concrete_system:
InfrastructureManager.logger.error(
"Error, no concrete system to deploy: " + deploy.id + " in cloud: " +
cloud_id + ". Check if a correct image is being used")
exceptions.append("Error, no concrete system to deploy: " +
deploy.id + ". Check if a correct image is being used.")
break
for task_cloud in task:
cloud, deploy, launch_radl, requested_radl, remain_vm, vm_type = task_cloud

(username, _, _, _) = concrete_system.getCredentialValues()
if not username:
raise IncorrectVMCrecentialsException(
"No username for deploy: " + deploy.id)
InfrastructureManager.logger.debug(
"Launching %d VMs of type %s" % (remain_vm, vm_type))
try:
launched_vms = cloud.cloud.getCloudConnector(sel_inf).launch(
sel_inf, launch_radl, requested_radl, remain_vm, auth)
except Exception, e:
InfrastructureManager.logger.exception("Error launching some of the VMs: %s" % e)
exceptions.append("Error launching the VMs of type %s to cloud ID %s"
" of type %s. Cloud Provider Error: %s" % (vm_type,
cloud.cloud.id,
cloud.cloud.type, e))
launched_vms = []

all_ok = True
for success, launched_vm in launched_vms:
if success:
InfrastructureManager.logger.debug("VM successfully launched: " + str(launched_vm.id))
deployed_vm.setdefault(deploy, []).append(launched_vm)
deploy.cloud_id = cloud_id
remain_vm -= 1
else:
all_ok = False
InfrastructureManager.logger.warn("Error launching some of the VMs: " + str(launched_vm))
exceptions.append("Error launching the VMs of type %s to cloud ID %s of type %s. %s" % (
vm_type, cloud.cloud.id, cloud.cloud.type, str(launched_vm)))
if not isinstance(launched_vm, (str, unicode)):
cloud.finalize(launched_vm, auth)

launch_radl = radl.clone()
launch_radl.systems = [concrete_system.clone()]
requested_radl = radl.clone()
requested_radl.systems = [
radl.get_system_by_name(concrete_system.name)]
try:
InfrastructureManager.logger.debug(
"Launching %d VMs of type %s" % (remain_vm, concrete_system.name))
launched_vms = cloud.cloud.getCloudConnector(sel_inf).launch(
sel_inf, launch_radl, requested_radl, remain_vm, auth)
except Exception, e:
InfrastructureManager.logger.exception("Error launching some of the VMs: %s" % e)
exceptions.append("Error launching the VMs of type %s to cloud ID %s"
" of type %s. Cloud Provider Error: %s" % (concrete_system.name,
cloud.cloud.id,
cloud.cloud.type, e))
launched_vms = []
for success, launched_vm in launched_vms:
if success:
InfrastructureManager.logger.debug(
"VM successfully launched: " + str(launched_vm.id))
deployed_vm.setdefault(
deploy, []).append(launched_vm)
deploy.cloud_id = cloud_id
remain_vm -= 1
else:
InfrastructureManager.logger.warn(
"Error launching some of the VMs: " + str(launched_vm))
exceptions.append("Error launching the VMs of type %s to cloud ID %s of type %s. %s" % (
concrete_system.name, cloud.cloud.id, cloud.cloud.type, str(launched_vm)))
if not isinstance(launched_vm, (str, unicode)):
cloud.finalize(launched_vm, auth)
fail_cont += 1
if remain_vm > 0 or cancel_deployment:
all_ok = False
fail_cont += 1
break

if not all_ok:
# Something has failed, finalize the VMs created and try with other cloud provider (if avail)
for deploy in deploy_group:
for vm in deployed_vm.get(deploy, []):
vm.finalize(auth)
deployed_vm[deploy] = []
if cancel_deployment or all_ok:
else:
# All was OK so do not try with other cloud provider
break

if not all_ok and not cancel_deployment:
msg = ""
for i, e in enumerate(exceptions):
msg += "Attempt " + str(i + 1) + ": " + str(e) + "\n"
cancel_deployment.append(
Exception("All machines could not be launched: \n%s" % msg))

@staticmethod
def _launch_groups(sel_inf, deploy_groups, deploys_group_cloud_list_all, cloud_list, concrete_systems,
radl, auth, deployed_vm, cancel_deployment):
try:
tasks = []
for deploy_group in deploy_groups:
deploys_group_cloud_list = deploys_group_cloud_list_all[id(deploy_group)]
if not deploy_group:
InfrastructureManager.logger.warning("No VMs to deploy!")
return
if not deploys_group_cloud_list:
cancel_deployment.append(Exception("No cloud provider available"))
return
exceptions = []
task_cloud = []
for cloud_id in deploys_group_cloud_list:
cloud = cloud_list[cloud_id]
for deploy in deploy_group:
concrete_system = concrete_systems[cloud_id][deploy.id][0]
if not concrete_system:
InfrastructureManager.logger.error(
"Error, no concrete system to deploy: " + deploy.id + " in cloud: " +
cloud_id + ". Check if a correct image is being used")
exceptions.append("Error, no concrete system to deploy: " +
deploy.id + ". Check if a correct image is being used.")
break

(username, _, _, _) = concrete_system.getCredentialValues()
if not username:
raise IncorrectVMCrecentialsException("No username for deploy: " + deploy.id)

launch_radl = radl.clone()
launch_radl.systems = [concrete_system.clone()]
requested_radl = radl.clone()
requested_radl.systems = [radl.get_system_by_name(concrete_system.name)]
task_cloud.append((cloud, deploy, launch_radl, requested_radl,
deploy.vm_number, concrete_system.name))

if task_cloud:
tasks.append(task_cloud)

if Config.MAX_SIMULTANEOUS_LAUNCHES > 1:
pool = ThreadPool(processes=Config.MAX_SIMULTANEOUS_LAUNCHES)
pool.map(
lambda task: InfrastructureManager._launch_vm(sel_inf, task, cloud_id, deploy_group, auth,
deployed_vm, cancel_deployment, exceptions), tasks)
pool.close()
else:
for task in tasks:
InfrastructureManager._launch_vm(sel_inf, task, cloud_id, deploy_group, auth,
deployed_vm, cancel_deployment, exceptions)
except Exception, e:
# Please, avoid exception to arrive to this level, because some virtual
# machine may lost.
cancel_deployment.append(e)

@staticmethod
def get_infrastructure(inf_id, auth):
"""Return infrastructure info with some id if valid authorization provided."""
Expand Down Expand Up @@ -517,23 +548,9 @@ def AddResource(inf_id, radl_data, auth, context=True, failed_clouds=[]):
# Launch every group in the same cloud provider
deployed_vm = {}
cancel_deployment = []
try:
if Config.MAX_SIMULTANEOUS_LAUNCHES > 1:
pool = ThreadPool(processes=Config.MAX_SIMULTANEOUS_LAUNCHES)
pool.map(
lambda ds: InfrastructureManager._launch_group(sel_inf, ds, deploys_group_cloud_list[id(ds)],
cloud_list, concrete_systems, radl, auth,
deployed_vm, cancel_deployment), deploy_groups)
pool.close()
else:
for ds in deploy_groups:
InfrastructureManager._launch_group(sel_inf, ds, deploys_group_cloud_list[id(ds)],
cloud_list, concrete_systems, radl,
auth, deployed_vm, cancel_deployment)
except Exception, e:
# Please, avoid exception to arrive to this level, because some virtual
# machine may lost.
cancel_deployment.append(e)
InfrastructureManager._launch_groups(sel_inf, deploy_groups, deploys_group_cloud_list,
cloud_list, concrete_systems, radl, auth,
deployed_vm, cancel_deployment)

# We make this to maintain the order of the VMs in the sel_inf.vm_list
# according to the deploys shown in the RADL
Expand Down
86 changes: 86 additions & 0 deletions test/unit/test_im_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ def gen_launch_res(self, inf, radl, requested_radl, num_vm, auth_data):
res.append((True, vm))
return res

def sleep_and_create_vm(self, inf, radl, requested_radl, num_vm, auth_data):
res = []
for _ in range(num_vm):
time.sleep(5)
cloud = CloudInfo()
cloud.type = "Dummy"
vm = VirtualMachine(inf, "1234", cloud, radl, requested_radl)
vm.get_ssh = Mock(side_effect=self.get_dummy_ssh)
vm.state = VirtualMachine.RUNNING
res.append((True, vm))
return res

def get_cloud_connector_mock(self, name="MyMock0"):
cloud = type(name, (CloudConnector, object), {})
cloud.launch = Mock(side_effect=self.gen_launch_res)
Expand Down Expand Up @@ -329,6 +341,80 @@ def concreteSystem(s, mem):
self.assertEqual(call[3], 1)
IM.DestroyInfrastructure(infId, auth0)

def test_inf_addresources4(self):
"""Deploy a virtual machine when the first cloud provider fails."""

radl = RADL()
radl.add(system("s0", [Feature("disk.0.image.url", "=", ["mock0://linux.for.ev.er",
"mock1://linux.for.ev.er"]),
Feature("disk.0.os.credentials.username", "=", "user"),
Feature("disk.0.os.credentials.password", "=", "pass")]))
radl.add(deploy("s0", 1))
cloud0 = self.get_cloud_connector_mock("MyMock0")
self.register_cloudconnector("Mock0", cloud0)
cloud1 = type("MyMock1", (CloudConnector, object), {})
cloud1.launch = Mock(return_value=[(False, "Error")])
self.register_cloudconnector("Mock1", cloud1)

auth0 = self.getAuth([0], [], [("Mock1", 1), ("Mock0", 0)])

infId = IM.CreateInfrastructure("", auth0)
vms = IM.AddResource(infId, str(radl), auth0)
self.assertEqual(len(vms), 1)
self.assertEqual(cloud0.launch.call_count, 1)
self.assertEqual(cloud1.launch.call_count, 1)
for call, _ in cloud0.launch.call_args_list:
self.assertEqual(call[3], 1)
for call, _ in cloud1.launch.call_args_list:
self.assertEqual(call[3], 1)
IM.DestroyInfrastructure(infId, auth0)

def test_0inf_addresources5(self):
"""Deploy n independent virtual machines."""

n = 4 # Machines to deploy
radl = RADL()
radl.add(system("s0", [Feature("disk.0.image.url", "=", "mock0://linux.for.ev.er"),
Feature("disk.0.os.credentials.username", "=", "user"),
Feature("disk.0.os.credentials.password", "=", "pass")]))
radl.add(deploy("s0", n))
cloud = type("MyMock0", (CloudConnector, object), {})
cloud.launch = Mock(side_effect=self.sleep_and_create_vm)
self.register_cloudconnector("Mock", cloud)
auth0 = self.getAuth([0], [], [("Mock", 0)])
infId = IM.CreateInfrastructure("", auth0)

# in this case it will take aprox 20 secs
before = int(time.time())
Config.MAX_SIMULTANEOUS_LAUNCHES = 1
vms = IM.AddResource(infId, str(radl), auth0)
delay = int(time.time()) - before
self.assertLess(delay, 25)
self.assertGreater(delay, 19)

self.assertEqual(len(vms), n)
self.assertEqual(cloud.launch.call_count, n)
for call, _ in cloud.launch.call_args_list:
self.assertEqual(call[3], 1)

cloud = type("MyMock0", (CloudConnector, object), {})
cloud.launch = Mock(side_effect=self.sleep_and_create_vm)
self.register_cloudconnector("Mock", cloud)
# in this case it will take aprox 5 secs
before = int(time.time())
Config.MAX_SIMULTANEOUS_LAUNCHES = 4 # Test the pool
vms = IM.AddResource(infId, str(radl), auth0)
delay = int(time.time()) - before
self.assertLess(delay, 8)
self.assertGreater(delay, 4)

self.assertEqual(len(vms), n)
self.assertEqual(cloud.launch.call_count, n)
for call, _ in cloud.launch.call_args_list:
self.assertEqual(call[3], 1)

IM.DestroyInfrastructure(infId, auth0)

@patch('IM.VMRC.Client')
def test_inf_cloud_order(self, suds_cli):
"""Test cloud selection in base of the auth data order."""
Expand Down

0 comments on commit 6548136

Please sign in to comment.