Skip to content

Commit

Permalink
ec2_snapshot_info/tests: add unit-tests (#1211)
Browse files Browse the repository at this point in the history
ec2_snapshot_info/tests: add unit-tests

Depends-On: #1234
Depends-On: #1235
SUMMARY


Add the unit-test coverage of the ec2_snapshot_info module.
break up list_ec2_snapshots to move connection.describe_snapshots to  separate method.


COMPONENT NAME

ec2_snapshot_info
ADDITIONAL INFO
latest CI run coverage report  plugins_modules_ec2_snapshot_info_py.html

Reviewed-by: Gonéri Le Bouder <goneri@lebouder.net>
Reviewed-by: Mandar Kulkarni <mandar242@gmail.com>
  • Loading branch information
mandar242 committed Nov 2, 2022
1 parent ac164b6 commit a29bacb
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
---
minor_changes:
- "ec2_snapshot_info - Add unit-tests coverage (https://github.com/ansible-collections/amazon.aws/pull/1211)."
67 changes: 43 additions & 24 deletions plugins/modules/ec2_snapshot_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,30 +219,38 @@
from ansible_collections.amazon.aws.plugins.module_utils.ec2 import boto3_tag_list_to_ansible_dict


def list_ec2_snapshots(connection, module):

snapshot_ids = module.params.get("snapshot_ids")
owner_ids = [str(owner_id) for owner_id in module.params.get("owner_ids")]
restorable_by_user_ids = [str(user_id) for user_id in module.params.get("restorable_by_user_ids")]
filters = ansible_dict_to_boto3_filter_list(module.params.get("filters"))
max_results = module.params.get('max_results')
next_token = module.params.get('next_token_id')
optional_param = {}
if max_results:
optional_param['MaxResults'] = max_results
if next_token:
optional_param['NextToken'] = next_token
def build_request_args(snapshot_ids, owner_ids, restorable_by_user_ids, filters, max_results, next_token_id):

request_args = {
'Filters': ansible_dict_to_boto3_filter_list(filters),
'MaxResults': max_results,
'NextToken': next_token_id,
'OwnerIds': owner_ids,
'RestorableByUserIds': [str(user_id) for user_id in restorable_by_user_ids],
'SnapshotIds': snapshot_ids
}

request_args = {k: v for k, v in request_args.items() if v}

return request_args


def get_snapshots(connection, module, request_args):
snapshot_ids = request_args.get("snapshot_ids")
try:
snapshots = connection.describe_snapshots(
aws_retry=True,
SnapshotIds=snapshot_ids, OwnerIds=owner_ids,
RestorableByUserIds=restorable_by_user_ids, Filters=filters,
**optional_param)
snapshots = connection.describe_snapshots(aws_retry=True, **request_args)
except is_boto3_error_code('InvalidSnapshot.NotFound') as e:
if len(snapshot_ids) > 1:
module.warn("Some of your snapshots may exist, but %s" % str(e))
snapshots = {'Snapshots': []}

return snapshots


def list_ec2_snapshots(connection, module, request_args):

try:
snapshots = get_snapshots(connection, module, request_args)
except ClientError as e: # pylint: disable=duplicate-except
module.fail_json_aws(e, msg='Failed to describe snapshots')

Expand All @@ -262,18 +270,18 @@ def list_ec2_snapshots(connection, module):
if snapshots.get('NextToken'):
result.update(camel_dict_to_snake_dict({'NextTokenId': snapshots.get('NextToken')}))

module.exit_json(**result)
return result


def main():

argument_spec = dict(
snapshot_ids=dict(default=[], type='list', elements='str'),
owner_ids=dict(default=[], type='list', elements='str'),
restorable_by_user_ids=dict(default=[], type='list', elements='str'),
filters=dict(default={}, type='dict'),
max_results=dict(type='int'),
next_token_id=dict(type='str')
next_token_id=dict(type='str'),
owner_ids=dict(default=[], type='list', elements='str'),
restorable_by_user_ids=dict(default=[], type='list', elements='str'),
snapshot_ids=dict(default=[], type='list', elements='str'),
)

module = AnsibleAWSModule(
Expand All @@ -288,7 +296,18 @@ def main():

connection = module.client('ec2', retry_decorator=AWSRetry.jittered_backoff())

list_ec2_snapshots(connection, module)
request_args = build_request_args(
filters=module.params["filters"],
max_results=module.params["max_results"],
next_token_id=module.params["next_token_id"],
owner_ids=module.params["owner_ids"],
restorable_by_user_ids=module.params["restorable_by_user_ids"],
snapshot_ids=module.params["snapshot_ids"],
)

result = list_ec2_snapshots(connection, module, request_args)

module.exit_json(**result)


if __name__ == '__main__':
Expand Down
125 changes: 125 additions & 0 deletions tests/unit/plugins/modules/test_ec2_snapshot_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# (c) 2022 Red Hat Inc.

# This file is part of Ansible
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from unittest.mock import MagicMock, Mock, patch, ANY, call
import pytest

from ansible_collections.amazon.aws.plugins.modules import ec2_snapshot_info

module_name = "ansible_collections.amazon.aws.plugins.modules.ec2_snapshot_info"


@pytest.mark.parametrize("snapshot_ids,owner_ids,restorable_by_user_ids,filters,max_results,next_token_id,expected", [([], [], [], {}, None, None, {})])
def test_build_request_args(snapshot_ids, owner_ids, restorable_by_user_ids, filters, max_results, next_token_id, expected):
assert ec2_snapshot_info.build_request_args(snapshot_ids, owner_ids, restorable_by_user_ids, filters, max_results, next_token_id) == expected


def test_get_snapshots():
module = MagicMock()
connection = MagicMock()

connection.describe_snapshots.return_value = {
"Snapshots": [
{
"Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890",
"Encrypted": False,
"OwnerId": "123456789000",
"Progress": "100%",
"SnapshotId": "snap-0f00cba1234567890",
"StartTime": "2021-09-30T01:04:49.724000+00:00",
"State": "completed",
"StorageTier": "standard",
"Tags": [
{
'Key': 'TagKey',
'Value': 'TagValue'
},
],
"VolumeId": "vol-0ae6c5e1234567890",
"VolumeSize": 10
},
{
"Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890",
"Encrypted": False,
"OwnerId": "123456789000",
"Progress": "100%",
"SnapshotId": "snap-0f00cba1234567890",
"StartTime": "2021-09-30T01:04:49.724000+00:00",
"State": "completed",
"StorageTier": "standard",
"Tags": [
{
'Key': 'TagKey',
'Value': 'TagValue'
},
],
"VolumeId": "vol-0ae6c5e1234567890",
"VolumeSize": 10
}
]}

request_args = {
"SnapshotIds": ["snap-0f00cba1234567890"]
}

snapshot_info = ec2_snapshot_info.get_snapshots(connection, module, request_args)

assert connection.describe_snapshots.call_count == 1
connection.describe_snapshots.assert_called_with(aws_retry=True, SnapshotIds=["snap-0f00cba1234567890"])
assert len(snapshot_info['Snapshots']) == 2


@patch(module_name + ".build_request_args")
@patch(module_name + ".get_snapshots")
def test_list_ec2_snapshots(m_get_snapshots, m_build_request_args):
module = MagicMock()
connection = MagicMock()

m_get_snapshots.return_value = {
"Snapshots": [
{
"Description": "Created by CreateImage(i-083b9dd1234567890) for ami-01486e111234567890",
"Encrypted": False,
"OwnerId": "123456789000",
"Progress": "100%",
"SnapshotId": "snap-0f00cba1234567890",
"StartTime": "2021-09-30T01:04:49.724000+00:00",
"State": "completed",
"StorageTier": "standard",
"Tags": [
{
'Key': 'TagKey',
'Value': 'TagValue'
},
],
"VolumeId": "vol-0ae6c5e1234567890",
"VolumeSize": 10
}
]}

m_build_request_args.return_value = {
'SnapshotIds': ["snap-0f00cba1234567890"]
}

request_args = ec2_snapshot_info.build_request_args()

ec2_snapshot_info.list_ec2_snapshots(connection, module, request_args)

assert m_get_snapshots.call_count == 1
m_get_snapshots.assert_has_calls(
[
call(connection, module, m_build_request_args.return_value),
]
)


@patch(module_name + ".AnsibleAWSModule")
def test_main_success(m_AnsibleAWSModule):
m_module = MagicMock()
m_AnsibleAWSModule.return_value = m_module

ec2_snapshot_info.main()

m_module.client.assert_called_with("ec2", retry_decorator=ANY)

0 comments on commit a29bacb

Please sign in to comment.