Skip to content

Commit

Permalink
Fix EC2 test_cloud timing issues
Browse files Browse the repository at this point in the history
Fixes bug 877661

Unfortunately, these tests are really integration tests.  Being that
they are, rpc.cast is now stubbed to do an rpc.call to ensure the
operations complete.  No need for sleeps anymore.

This also uncovers some other issues with the tests..  And it uncovers a
bug in the network API where a wrong argument is passed to destroy a
floating IP.

Change-Id: Ia7f40718533e450f00cd3e7d753ac65755c70588
  • Loading branch information
comstud committed Oct 18, 2011
1 parent e92494e commit f0125d2
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 87 deletions.
2 changes: 1 addition & 1 deletion nova/network/api.py
Expand Up @@ -79,7 +79,7 @@ def release_floating_ip(self, context, address,
rpc.cast(context,
FLAGS.network_topic,
{'method': 'deallocate_floating_ip',
'args': {'floating_address': address,
'args': {'address': address,
'affect_auto_assigned': affect_auto_assigned}})

def associate_floating_ip(self, context, floating_address, fixed_address,
Expand Down
127 changes: 41 additions & 86 deletions nova/tests/api/ec2/test_cloud.py
Expand Up @@ -15,37 +15,48 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mox

import base64
import functools

from base64 import b64decode
from M2Crypto import BIO
from M2Crypto import RSA
import os

from eventlet import greenthread
from M2Crypto import BIO
from M2Crypto import RSA
import mox

from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.compute import vm_states
from nova import context
from nova import crypto
from nova import db
from nova import exception
from nova import flags
from nova.image import fake
from nova import log as logging
from nova import manager
from nova import network
from nova import rpc
from nova import test
from nova import utils
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.compute import vm_states
from nova.image import fake


FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.cloud')


flags.DEFINE_string('ajax_proxy_manager',
'nova.tests.api.ec2.test_cloud.AjaxProxyManager', '')


# Fake ajax proxy service, so that an 'rpc.call' will work.
class AjaxProxyManager(manager.SchedulerDependentManager):
@staticmethod
def authorize_ajax_console(context, **kwargs):
return None


class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
Expand All @@ -60,6 +71,7 @@ def setUp(self):
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
self.volume = self.start_service('volume')
self.ajax_proxy = self.start_service('ajax_proxy')
self.image_service = utils.import_object(FLAGS.image_service)

self.user_id = 'fake'
Expand All @@ -76,14 +88,9 @@ def fake_show(meh, context, id):
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)

# NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
rpc_cast = rpc.cast

def finish_cast(*args, **kwargs):
rpc_cast(*args, **kwargs)
greenthread.sleep(0.2)

self.stubs.Set(rpc, 'cast', finish_cast)
# NOTE(comstud): Make 'cast' behave like a 'call' which will
# ensure that operations complete
self.stubs.Set(rpc, 'cast', rpc.call)

def _create_key(self, name):
# NOTE(vish): create depends on pool, so just call helper directly
Expand Down Expand Up @@ -126,7 +133,8 @@ def test_release_address(self):
allocate = self.cloud.allocate_address
db.floating_ip_create(self.context,
{'address': address,
'host': self.network.host})
'host': self.network.host,
'project_id': self.project_id})
result = self.cloud.release_address(self.context, address)
self.assertEqual(result['releaseResponse'], ['Address released.'])

Expand Down Expand Up @@ -459,7 +467,8 @@ def test_describe_snapshots(self):

def test_create_snapshot(self):
"""Makes sure create_snapshot works."""
vol = db.volume_create(self.context, {'status': "available"})
vol = db.volume_create(self.context,
{'status': "available", 'size': 0})
volume_id = ec2utils.id_to_ec2_vol_id(vol['id'])

result = self.cloud.create_snapshot(self.context,
Expand All @@ -474,9 +483,12 @@ def test_create_snapshot(self):

def test_delete_snapshot(self):
"""Makes sure delete_snapshot works."""
vol = db.volume_create(self.context, {'status': "available"})
snap = db.snapshot_create(self.context, {'volume_id': vol['id'],
'status': "available"})
vol = db.volume_create(self.context,
{'status': "available", 'size': 0})
snap = db.snapshot_create(self.context,
{'volume_id': vol['id'],
'status': "available",
'volume_size': 0})
snapshot_id = ec2utils.id_to_ec2_snap_id(snap['id'])

result = self.cloud.delete_snapshot(self.context,
Expand Down Expand Up @@ -1066,19 +1078,15 @@ def _run_instance(self, **kwargs):
instance_id = rv['instancesSet'][0]['instanceId']
return instance_id

def _run_instance_wait(self, **kwargs):
ec2_instance_id = self._run_instance(**kwargs)
self._wait_for_running(ec2_instance_id)
return ec2_instance_id

def test_console_output(self):
instance_id = self._run_instance(
image_id='ami-1',
instance_type=FLAGS.default_instance_type,
max_count=1)
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
self.assertEquals(base64.b64decode(output['output']),
'FAKE CONSOLE?OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
rv = self.cloud.terminate_instances(self.context, [instance_id])
Expand Down Expand Up @@ -1269,46 +1277,6 @@ def _restart_compute_service(self, periodic_interval=None):
else:
self.compute = self.start_service('compute')

def _wait_for_state(self, ctxt, instance_id, predicate):
"""Wait for a stopped instance to be a given state"""
id = ec2utils.ec2_id_to_id(instance_id)
while True:
info = self.cloud.compute_api.get(context=ctxt, instance_id=id)
LOG.debug(info)
if predicate(info):
break
greenthread.sleep(0.5)

def _wait_for_running(self, instance_id):
def is_running(info):
vm_state = info["vm_state"]
task_state = info["task_state"]
return vm_state == vm_states.ACTIVE and task_state == None
self._wait_for_state(self.context, instance_id, is_running)

def _wait_for_stopped(self, instance_id):
def is_stopped(info):
vm_state = info["vm_state"]
task_state = info["task_state"]
return vm_state == vm_states.STOPPED and task_state == None
self._wait_for_state(self.context, instance_id, is_stopped)

def _wait_for_terminate(self, instance_id):
def is_deleted(info):
return info['deleted']
id = ec2utils.ec2_id_to_id(instance_id)
# NOTE(vish): Wait for InstanceNotFound, then verify that
# the instance is actually deleted.
while True:
try:
self.cloud.compute_api.get(self.context, instance_id=id)
except exception.InstanceNotFound:
break
greenthread.sleep(0.1)

elevated = self.context.elevated(read_deleted=True)
self._wait_for_state(elevated, instance_id, is_deleted)

def test_stop_start_instance(self):
"""Makes sure stop/start instance works"""
# enforce periodic tasks run in short time to avoid wait for 60s.
Expand All @@ -1317,23 +1285,20 @@ def test_stop_start_instance(self):
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1, }
instance_id = self._run_instance_wait(**kwargs)
instance_id = self._run_instance(**kwargs)

# a running instance can't be started. It is just ignored.
result = self.cloud.start_instances(self.context, [instance_id])
self.assertTrue(result)

result = self.cloud.stop_instances(self.context, [instance_id])
self.assertTrue(result)
self._wait_for_stopped(instance_id)

result = self.cloud.start_instances(self.context, [instance_id])
self.assertTrue(result)
self._wait_for_running(instance_id)

result = self.cloud.stop_instances(self.context, [instance_id])
self.assertTrue(result)
self._wait_for_stopped(instance_id)

result = self.cloud.terminate_instances(self.context, [instance_id])
self.assertTrue(result)
Expand Down Expand Up @@ -1379,7 +1344,7 @@ def test_stop_start_with_volume(self):
'volume_id': vol2['id'],
'delete_on_termination': True},
]}
ec2_instance_id = self._run_instance_wait(**kwargs)
ec2_instance_id = self._run_instance(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)

vols = db.volume_get_all_by_instance(self.context, instance_id)
Expand All @@ -1395,15 +1360,13 @@ def test_stop_start_with_volume(self):

result = self.cloud.stop_instances(self.context, [ec2_instance_id])
self.assertTrue(result)
self._wait_for_stopped(ec2_instance_id)

vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_detached(vol)

self.cloud.start_instances(self.context, [ec2_instance_id])
self._wait_for_running(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 2)
for vol in vols:
Expand All @@ -1415,7 +1378,6 @@ def test_stop_start_with_volume(self):
self.assertEqual(vol['attach_status'], "attached")

self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)

admin_ctxt = context.get_admin_context(read_deleted=False)
vol = db.volume_get(admin_ctxt, vol1['id'])
Expand All @@ -1442,7 +1404,7 @@ def test_stop_with_attached_volume(self):
'block_device_mapping': [{'device_name': '/dev/vdb',
'volume_id': vol1['id'],
'delete_on_termination': True}]}
ec2_instance_id = self._run_instance_wait(**kwargs)
ec2_instance_id = self._run_instance(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)

vols = db.volume_get_all_by_instance(self.context, instance_id)
Expand All @@ -1458,26 +1420,22 @@ def test_stop_with_attached_volume(self):
instance_id=instance_id,
volume_id=vol2['id'],
device='/dev/vdc')
greenthread.sleep(0.3)
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdc')

self.cloud.compute_api.detach_volume(self.context,
volume_id=vol1['id'])
greenthread.sleep(0.3)
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)

result = self.cloud.stop_instances(self.context, [ec2_instance_id])
self.assertTrue(result)
self._wait_for_stopped(ec2_instance_id)

for vol_id in (vol1['id'], vol2['id']):
vol = db.volume_get(self.context, vol_id)
self._assert_volume_detached(vol)

self.cloud.start_instances(self.context, [ec2_instance_id])
self._wait_for_running(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 1)
for vol in vols:
Expand All @@ -1488,7 +1446,6 @@ def test_stop_with_attached_volume(self):
self._assert_volume_detached(vol)

self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)

for vol_id in (vol1['id'], vol2['id']):
vol = db.volume_get(self.context, vol_id)
Expand All @@ -1501,7 +1458,6 @@ def test_stop_with_attached_volume(self):
def _create_snapshot(self, ec2_volume_id):
result = self.cloud.create_snapshot(self.context,
volume_id=ec2_volume_id)
greenthread.sleep(0.3)
return result['snapshotId']

def test_run_with_snapshot(self):
Expand All @@ -1523,7 +1479,7 @@ def test_run_with_snapshot(self):
{'device_name': '/dev/vdc',
'snapshot_id': snapshot2_id,
'delete_on_termination': True}]}
ec2_instance_id = self._run_instance_wait(**kwargs)
ec2_instance_id = self._run_instance(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)

vols = db.volume_get_all_by_instance(self.context, instance_id)
Expand All @@ -1547,7 +1503,6 @@ def test_run_with_snapshot(self):
self.assertTrue(vol2_id)

self.cloud.terminate_instances(self.context, [ec2_instance_id])
self._wait_for_terminate(ec2_instance_id)

admin_ctxt = context.get_admin_context(read_deleted=False)
vol = db.volume_get(admin_ctxt, vol1_id)
Expand All @@ -1574,7 +1529,7 @@ def test_create_image(self):
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1}
ec2_instance_id = self._run_instance_wait(**kwargs)
ec2_instance_id = self._run_instance(**kwargs)

# TODO(yamahata): s3._s3_create() can't be tested easily by unit test
# as there is no unit test for s3.create()
Expand Down

0 comments on commit f0125d2

Please sign in to comment.