Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 71 additions & 42 deletions samtranslator/intrinsics/actions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import re
from abc import ABC
from typing import Any, Dict, Optional, Tuple

from samtranslator.model.exceptions import InvalidTemplateException, InvalidDocumentException


class Action(object):
class Action(ABC):
"""
Base class for intrinsic function actions. Each intrinsic function must subclass this,
override the intrinsic_name, and provide a resolve() method
Expand All @@ -23,41 +25,40 @@ def __init__(self) -> None:
if not self.intrinsic_name:
raise TypeError("Subclass must provide a intrinsic_name")

def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def]
def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]:
"""
Subclass must implement this method to resolve the intrinsic function
TODO: input_dict should not be None.
"""
raise NotImplementedError("Subclass must implement this method")

def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def]
def resolve_resource_refs(
self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Subclass must implement this method to resolve resource references
TODO: input_dict should not be None.
"""
raise NotImplementedError("Subclass must implement this method")

def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def]
def resolve_resource_id_refs(
self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Subclass must implement this method to resolve resource references
TODO: input_dict should not be None.
"""
raise NotImplementedError("Subclass must implement this method")

def can_handle(self, input_dict): # type: ignore[no-untyped-def]
def can_handle(self, input_dict: Any) -> bool:
"""
Validates that the input dictionary contains only one key and is of the given intrinsic_name

:param input_dict: Input dictionary representing the intrinsic function
:return: True if it matches expected structure, False otherwise
"""

return (
input_dict is not None
and isinstance(input_dict, dict)
and len(input_dict) == 1
and self.intrinsic_name in input_dict
)
return isinstance(input_dict, dict) and len(input_dict) == 1 and self.intrinsic_name in input_dict

@classmethod
def _parse_resource_reference(cls, ref_value): # type: ignore[no-untyped-def]
def _parse_resource_reference(cls, ref_value: Any) -> Tuple[Optional[str], Optional[str]]:
"""
Splits a resource reference of structure "LogicalId.Property" and returns the "LogicalId" and "Property"
separately.
Expand All @@ -84,7 +85,7 @@ def _parse_resource_reference(cls, ref_value): # type: ignore[no-untyped-def]
class RefAction(Action):
intrinsic_name = "Ref"

def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def]
def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]:
"""
Resolves references that are present in the parameters and returns the value. If it is not in parameters,
this method simply returns the input unchanged.
Expand All @@ -95,7 +96,7 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt
:param parameters: Dictionary of parameter values for resolution
:return:
"""
if not self.can_handle(input_dict): # type: ignore[no-untyped-call]
if input_dict is None or not self.can_handle(input_dict):
return input_dict

param_name = input_dict[self.intrinsic_name]
Expand All @@ -107,7 +108,9 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt
return parameters[param_name]
return input_dict

def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def]
def resolve_resource_refs(
self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Resolves references to some property of a resource. These are runtime properties which can't be converted
to a value here. Instead we output another reference that will more actually resolve to the value when
Expand All @@ -122,11 +125,11 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i
:return dict: Dictionary with resource references resolved.
"""

if not self.can_handle(input_dict): # type: ignore[no-untyped-call]
if input_dict is None or not self.can_handle(input_dict):
return input_dict

ref_value = input_dict[self.intrinsic_name]
logical_id, property = self._parse_resource_reference(ref_value) # type: ignore[no-untyped-call]
logical_id, property = self._parse_resource_reference(ref_value)

# ref_value could not be parsed
if not logical_id:
Expand All @@ -138,7 +141,9 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i

return {self.intrinsic_name: resolved_value}

def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def]
def resolve_resource_id_refs(
self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Updates references to the old logical id of a resource to the new (generated) logical id.

Expand All @@ -150,7 +155,7 @@ def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # t
:return dict: Dictionary with resource references resolved.
"""

if not self.can_handle(input_dict): # type: ignore[no-untyped-call]
if input_dict is None or not self.can_handle(input_dict):
return input_dict

ref_value = input_dict[self.intrinsic_name]
Expand All @@ -169,7 +174,7 @@ def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # t
class SubAction(Action):
intrinsic_name = "Fn::Sub"

def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def]
def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]:
"""
Substitute references found within the string of `Fn::Sub` intrinsic function

Expand All @@ -193,7 +198,9 @@ def do_replacement(full_ref, prop_name): # type: ignore[no-untyped-def]

return self._handle_sub_action(input_dict, do_replacement) # type: ignore[no-untyped-call]

def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def]
def resolve_resource_refs(
self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Resolves reference to some property of a resource. Inside string to be substituted, there could be either a
"Ref" or a "GetAtt" usage of this property. They have to be handled differently.
Expand Down Expand Up @@ -252,7 +259,9 @@ def do_replacement(full_ref, ref_value): # type: ignore[no-untyped-def]

return self._handle_sub_action(input_dict, do_replacement) # type: ignore[no-untyped-call]

def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def]
def resolve_resource_id_refs(
self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Resolves reference to some property of a resource. Inside string to be substituted, there could be either a
"Ref" or a "GetAtt" usage of this property. They have to be handled differently.
Expand Down Expand Up @@ -318,7 +327,7 @@ def _handle_sub_action(self, input_dict, handler): # type: ignore[no-untyped-de
:param handler: handler that is specific to each implementation.
:return: Resolved value of the Sub dictionary
"""
if not self.can_handle(input_dict): # type: ignore[no-untyped-call]
if input_dict is None or not self.can_handle(input_dict):
return input_dict

key = self.intrinsic_name
Expand Down Expand Up @@ -397,11 +406,13 @@ def handler_method(full_ref, ref_value):
class GetAttAction(Action):
intrinsic_name = "Fn::GetAtt"

def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def]
def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]:
# Parameters can never be referenced within GetAtt value
return input_dict

def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def]
def resolve_resource_refs(
self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Resolve resource references within a GetAtt dict.

Expand All @@ -426,7 +437,7 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i
:return: Resolved dictionary
"""

if not self.can_handle(input_dict): # type: ignore[no-untyped-call]
if input_dict is None or not self.can_handle(input_dict):
return input_dict

key = self.intrinsic_name
Expand Down Expand Up @@ -454,7 +465,9 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i
resolved_value = supported_resource_refs.get(logical_id, property)
return self._get_resolved_dictionary(input_dict, key, resolved_value, remaining) # type: ignore[no-untyped-call]

def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def]
def resolve_resource_id_refs(
self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any]
) -> Optional[Any]:
"""
Resolve resource references within a GetAtt dict.

Expand All @@ -478,7 +491,7 @@ def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # t
:return: Resolved dictionary
"""

if not self.can_handle(input_dict): # type: ignore[no-untyped-call]
if input_dict is None or not self.can_handle(input_dict):
return input_dict

key = self.intrinsic_name
Expand Down Expand Up @@ -533,7 +546,7 @@ class FindInMapAction(Action):

intrinsic_name = "Fn::FindInMap"

def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def]
def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]:
"""
Recursively resolves "Fn::FindInMap"references that are present in the mappings and returns the value.
If it is not in mappings, this method simply returns the input unchanged.
Expand All @@ -543,7 +556,7 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt

:param parameters: Dictionary of mappings from the SAM template
"""
if not self.can_handle(input_dict): # type: ignore[no-untyped-call]
if input_dict is None or not self.can_handle(input_dict):
return input_dict

value = input_dict[self.intrinsic_name]
Expand All @@ -558,18 +571,34 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt
]
)

map_name = self.resolve_parameter_refs(value[0], parameters) # type: ignore[no-untyped-call]
top_level_key = self.resolve_parameter_refs(value[1], parameters) # type: ignore[no-untyped-call]
second_level_key = self.resolve_parameter_refs(value[2], parameters) # type: ignore[no-untyped-call]
map_name = self.resolve_parameter_refs(value[0], parameters)
top_level_key = self.resolve_parameter_refs(value[1], parameters)
second_level_key = self.resolve_parameter_refs(value[2], parameters)

if not all(isinstance(key, str) for key in [map_name, top_level_key, second_level_key]):
return input_dict

invalid_2_level_map_exception = InvalidDocumentException(
[
InvalidTemplateException(
f"Cannot use {self.intrinsic_name} on Mapping '{map_name}' which is not a a two-level map."
)
]
)

# We should be able to use dict_deep_get() if
# the behavior of missing key is return None instead of input_dict.
if map_name not in parameters:
return input_dict

if not isinstance(map_name, str) or not isinstance(top_level_key, str) or not isinstance(second_level_key, str):
if not isinstance(parameters[map_name], dict):
raise invalid_2_level_map_exception
if top_level_key not in parameters[map_name]:
return input_dict

if (
map_name not in parameters
or top_level_key not in parameters[map_name]
or second_level_key not in parameters[map_name][top_level_key]
):
if not isinstance(parameters[map_name][top_level_key], dict):
raise invalid_2_level_map_exception
if second_level_key not in parameters[map_name][top_level_key]:
return input_dict

return parameters[map_name][top_level_key][second_level_key]
13 changes: 0 additions & 13 deletions tests/intrinsics/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,6 @@ class MyAction(Action):
with self.assertRaises(TypeError):
MyAction()

def test_subclass_must_implement_resolve_method(self):
class MyAction(Action):
intrinsic_name = "foo"

with self.assertRaises(NotImplementedError):
MyAction().resolve_parameter_refs({}, {})

with self.assertRaises(NotImplementedError):
MyAction().resolve_resource_refs({}, {})

with self.assertRaises(NotImplementedError):
MyAction().resolve_resource_id_refs({}, {})

def test_can_handle_input(self):
class MyAction(Action):
intrinsic_name = "foo"
Expand Down
31 changes: 31 additions & 0 deletions tests/translator/input/error_invalid_mapping_by_findinmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Parameters:
ApplicationIdParam:
Type: String
Default: arn:aws:serverlessrepo:us-east-1:123456789012:applications/hello-world
VersionParam:
Type: String
Default: 1.0.0

Mappings:
InvaliMapping:
ap-southeast-1:
- Version: this should not be a list
cn-north-1:
- Version: this should not be a list
us-gov-west-1:
- Version: this should not be a list

Resources:

ApplicationFindInInvalidMap:
Type: AWS::Serverless::Application
Properties:
Location:
ApplicationId: !FindInMap
- InvaliMapping
- !Ref 'AWS::Region'
- Version
SemanticVersion: !FindInMap
- InvaliMapping
- !Ref 'AWS::Region'
- Version
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Structure of the SAM template is invalid. Cannot use Fn::FindInMap on Mapping 'InvaliMapping' which is not a a two-level map."
}