-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
- Loading branch information
1 parent
2bfa3ff
commit fdc42e9
Showing
5 changed files
with
189 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
""" | ||
Utilities | ||
""" | ||
|
||
|
||
import re | ||
from typing import List, Tuple, Union | ||
|
||
SUB_PATTERN = re.compile(r"\${(?P<ref>[^}]+)}") | ||
|
||
|
||
class Value: | ||
|
||
id = "" # noqa: VNE003 | ||
references = None | ||
|
||
def __init__(self, value: Union[dict, str]): | ||
""" | ||
Parse a CloudFormation value | ||
This handles intrinsic functions, such as 'Fn::Sub' and 'Fn::Join' and | ||
returns an object that contains both a uniquely identifiable string and | ||
references to other resources. | ||
""" | ||
|
||
self.references = [] | ||
|
||
# String | ||
if isinstance(value, str): | ||
self.id = value | ||
|
||
# Not a dict - return an error here | ||
elif not isinstance(value, dict): | ||
raise ValueError("'value' should be of type str or dict, got '%s'" % type(value)) | ||
|
||
# 'Ref' intrinsic function | ||
elif "Ref" in value: | ||
self.id, self.references = self._get_from_ref(value["Ref"]) | ||
|
||
# 'Fn::GetAtt' intrinsic function | ||
elif "Fn::GetAtt" in value: | ||
self.id, self.references = self._get_from_getatt(value["Fn::GetAtt"]) | ||
|
||
# 'Fn::Join' intrisic function | ||
elif "Fn::Join" in value: | ||
self.id, self.references = self._get_from_join(value["Fn::Join"]) | ||
|
||
# 'Fn::Sub' intrisic function | ||
elif "Fn::Sub" in value: | ||
self.id, self.references = self._get_from_sub(value["Fn::Sub"]) | ||
|
||
def _get_from_ref(self, value: str) -> Tuple[str, List[str]]: | ||
""" | ||
Return the name and references from a 'Ref' intrinsic function | ||
""" | ||
|
||
return [value, [value]] | ||
|
||
def _get_from_getatt(self, value: list) -> Tuple[str, List[str]]: | ||
""" | ||
Return the name and references from a 'Fn::GetAtt' intrinsic function | ||
""" | ||
|
||
id_ = ".".join(value) | ||
references = [value[0]] | ||
|
||
return (id_, references) | ||
|
||
def _get_from_join(self, value: list) -> Tuple[str, List[str]]: | ||
""" | ||
Return the name and references from a 'Fn::Join' intrinsic function | ||
""" | ||
|
||
delimiter = value[0] | ||
# Using Value() here to get nested references | ||
sub_values = [Value(v) for v in value[1]] | ||
|
||
id_ = delimiter.join([v.id for v in sub_values]) | ||
references = [] | ||
for sub_value in sub_values: | ||
references.extend(sub_value.references) | ||
|
||
return (id_, references) | ||
|
||
def _get_from_sub(self, value: Union[str, list]) -> Tuple[str, List[str]]: | ||
""" | ||
Return the name and references from a 'Fn::Sub' intrinsic function | ||
""" | ||
|
||
pattern = value | ||
variables = {} | ||
|
||
if isinstance(value, list): | ||
pattern = value[0] | ||
# Using Value() here to get nested references | ||
variables = {k: Value(v) for k, v in value[1].items()} | ||
|
||
references = [] | ||
|
||
for match in SUB_PATTERN.findall(pattern): | ||
if match in variables: | ||
references.extend(variables[match].references) | ||
else: | ||
references.append(match) | ||
|
||
return (pattern, references) |
24 changes: 24 additions & 0 deletions
24
cfn-lint-serverless/tests/templates/ws1002-sub-principal.pass.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
# Test for https://github.com/awslabs/serverless-rules/issues/78 | ||
AWSTemplateFormatVersion: "2010-09-09" | ||
Transform: "AWS::Serverless-2016-10-31" | ||
|
||
Parameters: | ||
ProjectId: | ||
Type: String | ||
|
||
Resources: | ||
Function: | ||
Type: AWS::Serverless::Function | ||
Properties: | ||
CodeUri: . | ||
Runtime: python3.8 | ||
Handler: main.handler | ||
Tracing: Active | ||
|
||
Permission: | ||
Type: AWS::Lambda::Permission | ||
Properties: | ||
FunctionName: !GetAtt Function.Arn | ||
Principal: !Sub pinpoint.${AWS::Region}.amazonaws.com | ||
Action: lambda:Invoke | ||
SourceArn: !Sub 'arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:/apps/${ProjectId}*' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
#!/usr/bin/env python3 | ||
""" | ||
Testing templates | ||
""" | ||
|
||
|
||
import collections | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
""" | ||
Testing utility functions | ||
""" | ||
|
||
|
||
import pytest | ||
|
||
from cfn_lint_serverless import utils | ||
|
||
value_test_cases = [ | ||
# str | ||
{"input": "MyString", "id": "MyString", "references": []}, | ||
# Ref | ||
{"input": {"Ref": "MyResource"}, "id": "MyResource", "references": ["MyResource"]}, | ||
# Fn::GetAtt | ||
{"input": {"Fn::GetAtt": ["MyResource", "Arn"]}, "id": "MyResource.Arn", "references": ["MyResource"]}, | ||
# Fn::Join | ||
{"input": {"Fn::Join": ["/", ["ABC", "DEF"]]}, "id": "ABC/DEF", "references": []}, | ||
# Fn::Join with references | ||
{ | ||
"input": {"Fn::Join": ["/", ["ABC", {"Ref": "MyResource"}]]}, | ||
"id": "ABC/MyResource", | ||
"references": ["MyResource"], | ||
}, | ||
# Fn::Sub | ||
{"input": {"Fn::Sub": "abc-${MyResource}"}, "id": "abc-${MyResource}", "references": ["MyResource"]}, | ||
# Fn::Sub with variables | ||
{"input": {"Fn::Sub": ["abc-${MyVar}", {"MyVar": "MyResource"}]}, "id": "abc-${MyVar}", "references": []}, | ||
# Fn::Sub with variables and references | ||
{ | ||
"input": {"Fn::Sub": ["abc-${MyVar}", {"MyVar": {"Ref": "MyResource"}}]}, | ||
"id": "abc-${MyVar}", | ||
"references": ["MyResource"], | ||
}, | ||
] | ||
|
||
|
||
@pytest.mark.parametrize("case", value_test_cases) | ||
def test_value(case): | ||
""" | ||
Test Value() | ||
""" | ||
|
||
print(case) | ||
|
||
output = utils.Value(case["input"]) | ||
|
||
assert case["id"] == output.id | ||
assert case["references"] == output.references |