Skip to content

Commit

Permalink
allow IsolatedProcessingModules to use multiple VMs
Browse files Browse the repository at this point in the history
  • Loading branch information
gaelmuller committed Sep 25, 2017
1 parent 1ff6b64 commit f56b7b9
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 19 deletions.
10 changes: 10 additions & 0 deletions fame/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ def list_value(list_of_values):
return list(result)


def ordered_list_value(list_of_values):
result = []

for value in list_of_values.split(','):
value = value.strip()
result.append(value)

return result


def send_file_to_remote(file, url):
if isinstance(file, basestring):
file = open(file, 'rb')
Expand Down
56 changes: 37 additions & 19 deletions fame/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from fame.common.constants import MODULES_ROOT
from fame.common.exceptions import ModuleInitializationError, ModuleExecutionError, MissingConfiguration
from fame.common.utils import iterify, is_iterable, list_value, save_response
from fame.common.utils import iterify, is_iterable, list_value, save_response, ordered_list_value
from fame.common.mongo_dict import MongoDict
from fame.core.config import Config, apply_config_update, incomplete_config
from fame.core.internals import Internals
Expand Down Expand Up @@ -541,25 +541,25 @@ class IsolatedProcessingModule(ProcessingModule):
{
'name': 'label',
'type': 'str',
'description': 'Label of the virtual machine to use.'
'description': 'Label of the virtual machine to use. Several VMs can be specified by using a comma-delimited list of labels.'
},
{
'name': 'snapshot',
'type': 'str',
'default': None,
'description': 'Name of the snaptshot to restore clean state.'
'description': 'Name of the snapshot to use to restore clean state.'
},
{
'name': 'ip_address',
'type': 'str',
'default': '127.0.0.1',
'description': 'IP address of the guest. 127.0.0.1 can only be used with NAT and port forwarding.'
'description': 'IP address of the guest. 127.0.0.1 can only be used with NAT and port forwarding. When using muliple VMs, specify all IP addresses in a comma-delimited list.'
},
{
'name': 'port',
'type': 'str',
'default': '4242',
'description': 'Port the agent is listening to. You might have to change this value if you are using NAT and port forwarding.'
'description': 'Port the agent is listening to. You might have to change this value if you are using NAT and port forwarding. When using muliple VMs, specify all ports in a comma-delimited list.'
},
{
'name': 'always_ready',
Expand All @@ -578,8 +578,13 @@ class IsolatedProcessingModule(ProcessingModule):
def initialize(self):
self.task_id = None
self.should_restore = False
self.base_url = "http://{}:{}".format(self.ip_address, self.port)
self.vm_record = "{}|{}".format(self.virtualization, self.label)

self.labels = ordered_list_value(self.label)
self.ip_addresses = ordered_list_value(self.ip_address)
self.ports = ordered_list_value(self.port)

if not (len(self.labels) == len(self.ip_addresses) == len(self.ports)):
raise ModuleInitializationError(self, "List values for 'label', 'ip_address' and 'port' must contain exactly the same number of elements.")

def __init__(self, with_config=True):
ProcessingModule.__init__(self, with_config)
Expand Down Expand Up @@ -679,6 +684,11 @@ def _get_results(self):

return results['_results']['result']

def _use_vm(self, index):
self.locked_label = self.labels[index]
self.base_url = "http://{}:{}".format(self.ip_addresses[index], self.ports[index])
self.vm_record = "{}|{}".format(self.virtualization, self.locked_label)

def _acquire_lock(self):
LOCK_TIMEOUT = timedelta(minutes=120)
WAIT_STEP = 15
Expand All @@ -689,19 +699,27 @@ def _acquire_lock(self):
vms = Internals({'name': 'virtual_machines'})
vms.save()

last_locked = "{}.last_locked".format(self.vm_record)
while True:
if vms.update_value([self.vm_record, 'locked'], True):
vms.update_value([self.vm_record, 'last_locked'], datetime.now())
break
locked_vm = False
while not locked_vm:
for i, label in enumerate(self.labels):
self._use_vm(i)

expired_date = datetime.now() - LOCK_TIMEOUT
if vms._update({'$set': {last_locked: datetime.now()}},
{last_locked: {'$lt': expired_date}}):
vms.update_value([self.vm_record, 'locked'], True)
break
last_locked = "{}.last_locked".format(self.vm_record)

if vms.update_value([self.vm_record, 'locked'], True):
vms.update_value([self.vm_record, 'last_locked'], datetime.now())
locked_vm = True
break

expired_date = datetime.now() - LOCK_TIMEOUT
if vms._update({'$set': {last_locked: datetime.now()}},
{last_locked: {'$lt': expired_date}}):
vms.update_value([self.vm_record, 'locked'], True)
locked_vm = True
break

sleep(WAIT_STEP)
if not locked_vm:
sleep(WAIT_STEP)

def _release_lock(self):
vms = Internals.get(name='virtual_machines')
Expand All @@ -714,7 +732,7 @@ def _init_vm(self):
if self._vm is None:
raise ModuleExecutionError('missing (or disabled) virtualization module: {}'.format(self.virtualization))

self._vm.initialize(self.label, self.base_url, self.snapshot)
self._vm.initialize(self.locked_label, self.base_url, self.snapshot)
self._vm.prepare()

def _restore_vm(self):
Expand Down

0 comments on commit f56b7b9

Please sign in to comment.