Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ec2_security_group - refacter get_target_from_rule() #1221

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelogs/fragments/1221-ec2_security_group.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- ec2_security_group - refacter ``get_target_from_rule()`` (https://github.com/ansible-collections/amazon.aws/pull/1221).
277 changes: 163 additions & 114 deletions plugins/modules/ec2_security_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,6 @@
import re
from collections import namedtuple
from copy import deepcopy
from ipaddress import IPv6Network
from ipaddress import ip_network
from time import sleep

Expand Down Expand Up @@ -694,6 +693,116 @@ def validate_rule(rule):
raise SecurityGroupError(msg='Specify proto: icmp or icmpv6 when using icmp_type/icmp_code')


def _target_from_rule_with_group_id(rule, groups):
owner_id = current_account_id
FOREIGN_SECURITY_GROUP_REGEX = r'^([^/]+)/?(sg-\S+)?/(\S+)'
foreign_rule = re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id'])

if not foreign_rule:
return 'group', (owner_id, rule['group_id'], None), False

# this is a foreign Security Group. Since you can't fetch it you must create an instance of it
# Matches on groups like amazon-elb/sg-5a9c116a/amazon-elb-sg, amazon-elb/amazon-elb-sg,
# and peer-VPC groups like 0987654321/sg-1234567890/example
owner_id, group_id, group_name = foreign_rule.groups()
group_instance = dict(UserId=owner_id, GroupId=group_id, GroupName=group_name)
groups[group_id] = group_instance
groups[group_name] = group_instance
if group_id and group_name:
if group_name.startswith('amazon-'):
# amazon-elb and amazon-prefix rules don't need group_id specified,
group_id = None
else:
# For cross-VPC references we'll use group_id as it is more specific
group_name = None
return 'group', (owner_id, group_id, group_name), False


def _lookup_target_or_fail(client, group_name, vpc_id, groups, msg):
owner_id = current_account_id
filters = {'group-name': group_name}
if vpc_id:
filters['vpc-id'] = vpc_id

filters = ansible_dict_to_boto3_filter_list(filters)
try:
found_group = get_security_groups_with_backoff(client, Filters=filters).get('SecurityGroups', [])[0]
except (is_boto3_error_code('InvalidGroup.NotFound'), IndexError):
raise SecurityGroupError(msg=msg)
except (BotoCoreError, ClientError) as e: # pylint: disable=duplicate-except
raise SecurityGroupError(msg="Failed to get security group", e=e)

group_id = found_group['GroupId']
groups[group_id] = found_group
groups[group_name] = found_group
return 'group', (owner_id, group_id, None), False


def _create_target_from_rule(client, rule, groups, vpc_id, check_mode):
owner_id = current_account_id
# We can't create a group in check mode...
if check_mode:
return 'group', (owner_id, None, None), True

group_name = rule['group_name']

try:
created_group = _create_security_group_with_wait(client, group_name, rule['group_desc'], vpc_id)
except is_boto3_error_code('InvalidGroup.Duplicate'):
# The group exists, but didn't show up in any of our previous describe-security-groups calls
# Try searching on a filter for the name, and allow a retry window for AWS to update
# the model on their end.
fail_msg = "Could not create or use existing group '{0}' in rule {1}. " \
"Make sure the group exists and try using the group_id " \
"instead of the name".format(group_name, rule)
return _lookup_target_or_fail(client, group_name, vpc_id, groups, fail_msg)
except (BotoCoreError, ClientError) as e:
raise SecurityGroupError(msg="Failed to create security group '{0}' in rule {1}", e=e)

group_id = created_group['GroupId']
groups[group_id] = created_group
groups[group_name] = created_group

return 'group', (owner_id, group_id, None), True


def _target_from_rule_with_group_name(client, rule, name, group, groups, vpc_id, check_mode):
group_name = rule['group_name']
owner_id = current_account_id
if group_name == name:
# Simplest case, the rule references itself
group_id = group['GroupId']
groups[group_id] = group
groups[group_name] = group
return 'group', (owner_id, group_id, None), False

# Already cached groups
if group_name in groups and group.get('VpcId') and groups[group_name].get('VpcId'):
# both are VPC groups, this is ok
group_id = groups[group_name]['GroupId']
return 'group', (owner_id, group_id, None), False

if group_name in groups and not (group.get('VpcId') or groups[group_name].get('VpcId')):
# both are EC2 classic, this is ok
group_id = groups[group_name]['GroupId']
return 'group', (owner_id, group_id, None), False

# if we got here, either the target group does not exist, or there
# is a mix of EC2 classic + VPC groups. Mixing of EC2 classic + VPC
# is bad, so we have to create a new SG because no compatible group
# exists

# Without a group description we can't create a new group, try looking up the group, or fail
# with a descriptive error message
if not rule.get('group_desc', '').strip():
# retry describing the group
fail_msg = "group '{0}' not found and would be automatically created by rule {1} but " \
"no description was provided".format(group_name, rule)
return _lookup_target_or_fail(client, group_name, vpc_id, groups, fail_msg)

return _create_target_from_rule(client, rule, groups, vpc_id, check_mode)


def get_target_from_rule(module, client, rule, name, group, groups, vpc_id):
"""
Returns tuple of (target_type, target, group_created) after validating rule params.
Expand All @@ -711,101 +820,21 @@ def get_target_from_rule(module, client, rule, name, group, groups, vpc_id):
values that will be compared to current_rules (from current_ingress and
current_egress) in wait_for_rule_propagation().
"""
FOREIGN_SECURITY_GROUP_REGEX = r'^([^/]+)/?(sg-\S+)?/(\S+)'
owner_id = current_account_id
group_id = None
group_name = None
target_group_created = False

try:
validate_rule(rule)
if rule.get('group_id'):
return _target_from_rule_with_group_id(rule, groups)
elif 'group_name' in rule:
return _target_from_rule_with_group_name(client, rule, name, group, groups, vpc_id, module.check_mode)
elif 'cidr_ip' in rule:
return 'ipv4', validate_ip(module, rule['cidr_ip']), False
elif 'cidr_ipv6' in rule:
return 'ipv6', validate_ip(module, rule['cidr_ipv6']), False
elif 'ip_prefix' in rule:
return 'ip_prefix', rule['ip_prefix'], False
except SecurityGroupError as e:
e.fail(module)

if rule.get('group_id') and re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id']):
# this is a foreign Security Group. Since you can't fetch it you must create an instance of it
# Matches on groups like amazon-elb/sg-5a9c116a/amazon-elb-sg, amazon-elb/amazon-elb-sg,
# and peer-VPC groups like 0987654321/sg-1234567890/example
owner_id, group_id, group_name = re.match(FOREIGN_SECURITY_GROUP_REGEX, rule['group_id']).groups()
group_instance = dict(UserId=owner_id, GroupId=group_id, GroupName=group_name)
groups[group_id] = group_instance
groups[group_name] = group_instance
if group_id and group_name:
if group_name.startswith('amazon-'):
# amazon-elb and amazon-prefix rules don't need group_id specified,
group_id = None
else:
# group_id/group_name are mutually exclusive - give group_id more precedence as it is more specific
group_name = None
return 'group', (owner_id, group_id, group_name), False
elif 'group_id' in rule:
return 'group', (owner_id, rule['group_id'], None), False
elif 'group_name' in rule:
group_name = rule['group_name']
if group_name == name:
group_id = group['GroupId']
groups[group_id] = group
groups[group_name] = group
elif group_name in groups and group.get('VpcId') and groups[group_name].get('VpcId'):
# both are VPC groups, this is ok
group_id = groups[group_name]['GroupId']
elif group_name in groups and not (group.get('VpcId') or groups[group_name].get('VpcId')):
# both are EC2 classic, this is ok
group_id = groups[group_name]['GroupId']
else:
auto_group = None
filters = {'group-name': group_name}
if vpc_id:
filters['vpc-id'] = vpc_id
# if we got here, either the target group does not exist, or there
# is a mix of EC2 classic + VPC groups. Mixing of EC2 classic + VPC
# is bad, so we have to create a new SG because no compatible group
# exists
if not rule.get('group_desc', '').strip():
# retry describing the group once
try:
auto_group = get_security_groups_with_backoff(client, Filters=ansible_dict_to_boto3_filter_list(filters)).get('SecurityGroups', [])[0]
except (is_boto3_error_code('InvalidGroup.NotFound'), IndexError):
module.fail_json(msg="group %s will be automatically created by rule %s but "
"no description was provided" % (group_name, rule))
except ClientError as e: # pylint: disable=duplicate-except
module.fail_json_aws(e)
elif not module.check_mode:
params = dict(GroupName=group_name, Description=rule['group_desc'])
if vpc_id:
params['VpcId'] = vpc_id
try:
auto_group = client.create_security_group(aws_retry=True, **params)
get_waiter(
client, 'security_group_exists',
).wait(
GroupIds=[auto_group['GroupId']],
)
except is_boto3_error_code('InvalidGroup.Duplicate'):
# The group exists, but didn't show up in any of our describe-security-groups calls
# Try searching on a filter for the name, and allow a retry window for AWS to update
# the model on their end.
try:
auto_group = get_security_groups_with_backoff(client, Filters=ansible_dict_to_boto3_filter_list(filters)).get('SecurityGroups', [])[0]
except IndexError:
module.fail_json(msg="Could not create or use existing group '{0}' in rule. Make sure the group exists".format(group_name))
except ClientError as e:
module.fail_json_aws(
e,
msg="Could not create or use existing group '{0}' in rule. Make sure the group exists".format(group_name))
if auto_group is not None:
group_id = auto_group['GroupId']
groups[group_id] = auto_group
groups[group_name] = auto_group
target_group_created = True
return 'group', (owner_id, group_id, None), target_group_created
elif 'cidr_ip' in rule:
return 'ipv4', validate_ip(module, rule['cidr_ip']), False
elif 'cidr_ipv6' in rule:
return 'ipv6', validate_ip(module, rule['cidr_ipv6']), False
elif 'ip_prefix' in rule:
return 'ip_prefix', rule['ip_prefix'], False

module.fail_json(msg="Could not match target for rule", failed_rule=rule)


Expand Down Expand Up @@ -977,30 +1006,36 @@ def authorize(client, module, ip_permissions, group_id, rule_type):

def validate_ip(module, cidr_ip):
split_addr = cidr_ip.split('/')
if len(split_addr) == 2:
# this_ip is a IPv4 or IPv6 CIDR that may or may not have host bits set
# Get the network bits if IPv4, and validate if IPv6.
try:
ip = to_subnet(split_addr[0], split_addr[1])
if ip != cidr_ip:
module.warn("One of your CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(
cidr_ip, ip))
except ValueError:
# to_subnet throws a ValueError on IPv6 networks, so we should be working with v6 if we get here
try:
isinstance(ip_network(to_text(cidr_ip)), IPv6Network)
ip = cidr_ip
except ValueError:
# If a host bit is set on something other than a /128, IPv6Network will throw a ValueError
# The ipv6_cidr in this case probably looks like "2001:DB8:A0B:12F0::1/64" and we just want the network bits
ip6 = to_ipv6_subnet(split_addr[0]) + "/" + split_addr[1]
if ip6 != cidr_ip:
module.warn("One of your IPv6 CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip6))
return ip6
if len(split_addr) != 2:
return cidr_ip

try:
ip = ip_network(to_text(cidr_ip))
return str(ip)
except ValueError:
# If a host bit is incorrectly set, ip_network will throw an error at us,
# we'll continue, convert the address to a CIDR AWS will accept, but issue a warning.
pass

# Try evaluating as an IPv4 network, it'll throw a ValueError if it can't parse cidr_ip as an
# IPv4 network
try:
ip = to_subnet(split_addr[0], split_addr[1])
module.warn("One of your CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip))
return ip
return cidr_ip
except ValueError:
pass

# Try again, evaluating as an IPv6 network.
try:
ip6 = to_ipv6_subnet(split_addr[0]) + "/" + split_addr[1]
module.warn("One of your IPv6 CIDR addresses ({0}) has host bits set. To get rid of this warning, "
"check the network mask and make sure that only network bits are set: {1}.".format(cidr_ip, ip6))
return ip6
except ValueError:
module.warn("Unable to parse CIDR ({0}).".format(cidr_ip))
return cidr_ip


def update_tags(client, module, group_id, current_tags, tags, purge_tags):
Expand Down Expand Up @@ -1048,6 +1083,20 @@ def update_rule_descriptions(module, client, group_id, present_ingress, named_tu
return changed


def _create_security_group_with_wait(client, name, description, vpc_id):
params = dict(GroupName=name, Description=description)
if vpc_id:
params['VpcId'] = vpc_id

created_group = client.create_security_group(aws_retry=True, **params)
get_waiter(
client, 'security_group_exists',
).wait(
GroupIds=[created_group['GroupId']],
)
return created_group


def create_security_group(client, module, name, description, vpc_id):
if not module.check_mode:
params = dict(GroupName=name, Description=description)
Expand Down
Loading