From 8cad3a825821c0c7340fb47106d7143cff801d27 Mon Sep 17 00:00:00 2001 From: Sam Liu Date: Thu, 15 Dec 2022 14:55:32 -0800 Subject: [PATCH] fix: Raise correct exception FindInMap used on parameters not a 2-level-map --- samtranslator/intrinsics/actions.py | 113 +++++++++++------- tests/intrinsics/test_actions.py | 13 -- .../error_invalid_mapping_by_findinmap.yaml | 31 +++++ .../error_invalid_mapping_by_findinmap.json | 3 + 4 files changed, 105 insertions(+), 55 deletions(-) create mode 100644 tests/translator/input/error_invalid_mapping_by_findinmap.yaml create mode 100644 tests/translator/output/error_invalid_mapping_by_findinmap.json diff --git a/samtranslator/intrinsics/actions.py b/samtranslator/intrinsics/actions.py index ca21c5c54a..37d2542714 100644 --- a/samtranslator/intrinsics/actions.py +++ b/samtranslator/intrinsics/actions.py @@ -1,9 +1,11 @@ import re +from abc import ABC +from typing import Any, Dict, Optional, Tuple from samtranslator.model.exceptions import InvalidTemplateException, InvalidDocumentException -class Action(object): +class Action(ABC): """ Base class for intrinsic function actions. Each intrinsic function must subclass this, override the intrinsic_name, and provide a resolve() method @@ -23,25 +25,29 @@ def __init__(self) -> None: if not self.intrinsic_name: raise TypeError("Subclass must provide a intrinsic_name") - def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def] + def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]: """ Subclass must implement this method to resolve the intrinsic function + TODO: input_dict should not be None. """ - raise NotImplementedError("Subclass must implement this method") - def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def] + def resolve_resource_refs( + self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any] + ) -> Optional[Any]: """ Subclass must implement this method to resolve resource references + TODO: input_dict should not be None. """ - raise NotImplementedError("Subclass must implement this method") - def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def] + def resolve_resource_id_refs( + self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any] + ) -> Optional[Any]: """ Subclass must implement this method to resolve resource references + TODO: input_dict should not be None. """ - raise NotImplementedError("Subclass must implement this method") - def can_handle(self, input_dict): # type: ignore[no-untyped-def] + def can_handle(self, input_dict: Any) -> bool: """ Validates that the input dictionary contains only one key and is of the given intrinsic_name @@ -49,15 +55,10 @@ def can_handle(self, input_dict): # type: ignore[no-untyped-def] :return: True if it matches expected structure, False otherwise """ - return ( - input_dict is not None - and isinstance(input_dict, dict) - and len(input_dict) == 1 - and self.intrinsic_name in input_dict - ) + return isinstance(input_dict, dict) and len(input_dict) == 1 and self.intrinsic_name in input_dict @classmethod - def _parse_resource_reference(cls, ref_value): # type: ignore[no-untyped-def] + def _parse_resource_reference(cls, ref_value: Any) -> Tuple[Optional[str], Optional[str]]: """ Splits a resource reference of structure "LogicalId.Property" and returns the "LogicalId" and "Property" separately. @@ -84,7 +85,7 @@ def _parse_resource_reference(cls, ref_value): # type: ignore[no-untyped-def] class RefAction(Action): intrinsic_name = "Ref" - def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def] + def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]: """ Resolves references that are present in the parameters and returns the value. If it is not in parameters, this method simply returns the input unchanged. @@ -95,7 +96,7 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt :param parameters: Dictionary of parameter values for resolution :return: """ - if not self.can_handle(input_dict): # type: ignore[no-untyped-call] + if input_dict is None or not self.can_handle(input_dict): return input_dict param_name = input_dict[self.intrinsic_name] @@ -107,7 +108,9 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt return parameters[param_name] return input_dict - def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def] + def resolve_resource_refs( + self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any] + ) -> Optional[Any]: """ Resolves references to some property of a resource. These are runtime properties which can't be converted to a value here. Instead we output another reference that will more actually resolve to the value when @@ -122,11 +125,11 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i :return dict: Dictionary with resource references resolved. """ - if not self.can_handle(input_dict): # type: ignore[no-untyped-call] + if input_dict is None or not self.can_handle(input_dict): return input_dict ref_value = input_dict[self.intrinsic_name] - logical_id, property = self._parse_resource_reference(ref_value) # type: ignore[no-untyped-call] + logical_id, property = self._parse_resource_reference(ref_value) # ref_value could not be parsed if not logical_id: @@ -138,7 +141,9 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i return {self.intrinsic_name: resolved_value} - def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def] + def resolve_resource_id_refs( + self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any] + ) -> Optional[Any]: """ Updates references to the old logical id of a resource to the new (generated) logical id. @@ -150,7 +155,7 @@ def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # t :return dict: Dictionary with resource references resolved. """ - if not self.can_handle(input_dict): # type: ignore[no-untyped-call] + if input_dict is None or not self.can_handle(input_dict): return input_dict ref_value = input_dict[self.intrinsic_name] @@ -169,7 +174,7 @@ def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # t class SubAction(Action): intrinsic_name = "Fn::Sub" - def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def] + def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]: """ Substitute references found within the string of `Fn::Sub` intrinsic function @@ -193,7 +198,9 @@ def do_replacement(full_ref, prop_name): # type: ignore[no-untyped-def] return self._handle_sub_action(input_dict, do_replacement) # type: ignore[no-untyped-call] - def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def] + def resolve_resource_refs( + self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any] + ) -> Optional[Any]: """ Resolves reference to some property of a resource. Inside string to be substituted, there could be either a "Ref" or a "GetAtt" usage of this property. They have to be handled differently. @@ -252,7 +259,9 @@ def do_replacement(full_ref, ref_value): # type: ignore[no-untyped-def] return self._handle_sub_action(input_dict, do_replacement) # type: ignore[no-untyped-call] - def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def] + def resolve_resource_id_refs( + self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any] + ) -> Optional[Any]: """ Resolves reference to some property of a resource. Inside string to be substituted, there could be either a "Ref" or a "GetAtt" usage of this property. They have to be handled differently. @@ -318,7 +327,7 @@ def _handle_sub_action(self, input_dict, handler): # type: ignore[no-untyped-de :param handler: handler that is specific to each implementation. :return: Resolved value of the Sub dictionary """ - if not self.can_handle(input_dict): # type: ignore[no-untyped-call] + if input_dict is None or not self.can_handle(input_dict): return input_dict key = self.intrinsic_name @@ -397,11 +406,13 @@ def handler_method(full_ref, ref_value): class GetAttAction(Action): intrinsic_name = "Fn::GetAtt" - def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def] + def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]: # Parameters can never be referenced within GetAtt value return input_dict - def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: ignore[no-untyped-def] + def resolve_resource_refs( + self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any] + ) -> Optional[Any]: """ Resolve resource references within a GetAtt dict. @@ -426,7 +437,7 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i :return: Resolved dictionary """ - if not self.can_handle(input_dict): # type: ignore[no-untyped-call] + if input_dict is None or not self.can_handle(input_dict): return input_dict key = self.intrinsic_name @@ -454,7 +465,9 @@ def resolve_resource_refs(self, input_dict, supported_resource_refs): # type: i resolved_value = supported_resource_refs.get(logical_id, property) return self._get_resolved_dictionary(input_dict, key, resolved_value, remaining) # type: ignore[no-untyped-call] - def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # type: ignore[no-untyped-def] + def resolve_resource_id_refs( + self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any] + ) -> Optional[Any]: """ Resolve resource references within a GetAtt dict. @@ -478,7 +491,7 @@ def resolve_resource_id_refs(self, input_dict, supported_resource_id_refs): # t :return: Resolved dictionary """ - if not self.can_handle(input_dict): # type: ignore[no-untyped-call] + if input_dict is None or not self.can_handle(input_dict): return input_dict key = self.intrinsic_name @@ -533,7 +546,7 @@ class FindInMapAction(Action): intrinsic_name = "Fn::FindInMap" - def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-untyped-def] + def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]: """ Recursively resolves "Fn::FindInMap"references that are present in the mappings and returns the value. If it is not in mappings, this method simply returns the input unchanged. @@ -543,7 +556,7 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt :param parameters: Dictionary of mappings from the SAM template """ - if not self.can_handle(input_dict): # type: ignore[no-untyped-call] + if input_dict is None or not self.can_handle(input_dict): return input_dict value = input_dict[self.intrinsic_name] @@ -558,18 +571,34 @@ def resolve_parameter_refs(self, input_dict, parameters): # type: ignore[no-unt ] ) - map_name = self.resolve_parameter_refs(value[0], parameters) # type: ignore[no-untyped-call] - top_level_key = self.resolve_parameter_refs(value[1], parameters) # type: ignore[no-untyped-call] - second_level_key = self.resolve_parameter_refs(value[2], parameters) # type: ignore[no-untyped-call] + map_name = self.resolve_parameter_refs(value[0], parameters) + top_level_key = self.resolve_parameter_refs(value[1], parameters) + second_level_key = self.resolve_parameter_refs(value[2], parameters) + + if not all(isinstance(key, str) for key in [map_name, top_level_key, second_level_key]): + return input_dict + + invalid_2_level_map_exception = InvalidDocumentException( + [ + InvalidTemplateException( + f"Cannot use {self.intrinsic_name} on Mapping '{map_name}' which is not a a two-level map." + ) + ] + ) + + # We should be able to use dict_deep_get() if + # the behavior of missing key is return None instead of input_dict. + if map_name not in parameters: + return input_dict - if not isinstance(map_name, str) or not isinstance(top_level_key, str) or not isinstance(second_level_key, str): + if not isinstance(parameters[map_name], dict): + raise invalid_2_level_map_exception + if top_level_key not in parameters[map_name]: return input_dict - if ( - map_name not in parameters - or top_level_key not in parameters[map_name] - or second_level_key not in parameters[map_name][top_level_key] - ): + if not isinstance(parameters[map_name][top_level_key], dict): + raise invalid_2_level_map_exception + if second_level_key not in parameters[map_name][top_level_key]: return input_dict return parameters[map_name][top_level_key][second_level_key] diff --git a/tests/intrinsics/test_actions.py b/tests/intrinsics/test_actions.py index abe4afa613..09caa4f66a 100644 --- a/tests/intrinsics/test_actions.py +++ b/tests/intrinsics/test_actions.py @@ -15,19 +15,6 @@ class MyAction(Action): with self.assertRaises(TypeError): MyAction() - def test_subclass_must_implement_resolve_method(self): - class MyAction(Action): - intrinsic_name = "foo" - - with self.assertRaises(NotImplementedError): - MyAction().resolve_parameter_refs({}, {}) - - with self.assertRaises(NotImplementedError): - MyAction().resolve_resource_refs({}, {}) - - with self.assertRaises(NotImplementedError): - MyAction().resolve_resource_id_refs({}, {}) - def test_can_handle_input(self): class MyAction(Action): intrinsic_name = "foo" diff --git a/tests/translator/input/error_invalid_mapping_by_findinmap.yaml b/tests/translator/input/error_invalid_mapping_by_findinmap.yaml new file mode 100644 index 0000000000..f54f62af7b --- /dev/null +++ b/tests/translator/input/error_invalid_mapping_by_findinmap.yaml @@ -0,0 +1,31 @@ +Parameters: + ApplicationIdParam: + Type: String + Default: arn:aws:serverlessrepo:us-east-1:123456789012:applications/hello-world + VersionParam: + Type: String + Default: 1.0.0 + +Mappings: + InvaliMapping: + ap-southeast-1: + - Version: this should not be a list + cn-north-1: + - Version: this should not be a list + us-gov-west-1: + - Version: this should not be a list + +Resources: + + ApplicationFindInInvalidMap: + Type: AWS::Serverless::Application + Properties: + Location: + ApplicationId: !FindInMap + - InvaliMapping + - !Ref 'AWS::Region' + - Version + SemanticVersion: !FindInMap + - InvaliMapping + - !Ref 'AWS::Region' + - Version diff --git a/tests/translator/output/error_invalid_mapping_by_findinmap.json b/tests/translator/output/error_invalid_mapping_by_findinmap.json new file mode 100644 index 0000000000..82fa107f56 --- /dev/null +++ b/tests/translator/output/error_invalid_mapping_by_findinmap.json @@ -0,0 +1,3 @@ +{ + "errorMessage": "Invalid Serverless Application Specification document. Number of errors found: 1. Structure of the SAM template is invalid. Cannot use Fn::FindInMap on Mapping 'InvaliMapping' which is not a a two-level map." +}