Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 25 additions & 29 deletions samtranslator/intrinsics/actions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from abc import ABC
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, Optional, Tuple, Callable, List

from samtranslator.model.exceptions import InvalidTemplateException, InvalidDocumentException

Expand All @@ -15,15 +15,7 @@ class Action(ABC):
"""

_resource_ref_separator = "."

# Note(xinhol): `Action` should have been an abstract class. Disabling the type check for the next
# line to avoid any potential behavior change.
# TODO: Make `Action` an abstract class and not giving `intrinsic_name` initial value.
intrinsic_name: str = None # type: ignore

def __init__(self) -> None:
if not self.intrinsic_name:
raise TypeError("Subclass must provide a intrinsic_name")
intrinsic_name: str

def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str, Any]) -> Optional[Any]:
"""
Expand Down Expand Up @@ -185,7 +177,7 @@ def resolve_parameter_refs(self, input_dict: Optional[Any], parameters: Dict[str
:return: Resolved
"""

def do_replacement(full_ref, prop_name): # type: ignore[no-untyped-def]
def do_replacement(full_ref: str, prop_name: str) -> Any:
"""
Replace parameter references with actual value. Return value of this method is directly replaces the
reference structure
Expand All @@ -196,7 +188,7 @@ def do_replacement(full_ref, prop_name): # type: ignore[no-untyped-def]
"""
return parameters.get(prop_name, full_ref)

return self._handle_sub_action(input_dict, do_replacement) # type: ignore[no-untyped-call]
return self._handle_sub_action(input_dict, do_replacement)

def resolve_resource_refs(
self, input_dict: Optional[Any], supported_resource_refs: Dict[str, Any]
Expand Down Expand Up @@ -226,7 +218,7 @@ def resolve_resource_refs(
:return: Resolved dictionary
"""

def do_replacement(full_ref, ref_value): # type: ignore[no-untyped-def]
def do_replacement(full_ref: str, ref_value: str) -> str:
"""
Perform the appropriate replacement to handle ${LogicalId.Property} type references inside a Sub.
This method is called to get the replacement string for each reference within Sub's value
Expand Down Expand Up @@ -257,7 +249,7 @@ def do_replacement(full_ref, ref_value): # type: ignore[no-untyped-def]
replacement = self._resource_ref_separator.join([logical_id, property_name])
return full_ref.replace(replacement, resolved_value)

return self._handle_sub_action(input_dict, do_replacement) # type: ignore[no-untyped-call]
return self._handle_sub_action(input_dict, do_replacement)

def resolve_resource_id_refs(
self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any]
Expand Down Expand Up @@ -286,7 +278,7 @@ def resolve_resource_id_refs(
:return: Resolved dictionary
"""

def do_replacement(full_ref, ref_value): # type: ignore[no-untyped-def]
def do_replacement(full_ref: str, ref_value: str) -> str:
"""
Perform the appropriate replacement to handle ${LogicalId} type references inside a Sub.
This method is called to get the replacement string for each reference within Sub's value
Expand Down Expand Up @@ -315,9 +307,11 @@ def do_replacement(full_ref, ref_value): # type: ignore[no-untyped-def]
# syntax and retain other attributes. Ex: ${LogicalId.Property.Arn} => ${SomeOtherLogicalId.Arn}
return full_ref.replace(logical_id, resolved_value)

return self._handle_sub_action(input_dict, do_replacement) # type: ignore[no-untyped-call]
return self._handle_sub_action(input_dict, do_replacement)

def _handle_sub_action(self, input_dict, handler): # type: ignore[no-untyped-def]
def _handle_sub_action(
self, input_dict: Optional[Dict[Any, Any]], handler: Callable[[str, str], str]
) -> Optional[Any]:
"""
Handles resolving replacements in the Sub action based on the handler that is passed as an input.

Expand All @@ -333,11 +327,11 @@ def _handle_sub_action(self, input_dict, handler): # type: ignore[no-untyped-de
key = self.intrinsic_name
sub_value = input_dict[key]

input_dict[key] = self._handle_sub_value(sub_value, handler) # type: ignore[no-untyped-call]
input_dict[key] = self._handle_sub_value(sub_value, handler)

return input_dict

def _handle_sub_value(self, sub_value, handler_method): # type: ignore[no-untyped-def]
def _handle_sub_value(self, sub_value: Any, handler_method: Callable[[str, str], str]) -> Any:
"""
Generic method to handle value to Fn::Sub key. We are interested in parsing the ${} syntaxes inside
the string portion of the value.
Expand All @@ -352,15 +346,15 @@ def _handle_sub_value(self, sub_value, handler_method): # type: ignore[no-untyp
# because that's the best we can do here.
if isinstance(sub_value, str):
# Ex: {Fn::Sub: "some string"}
sub_value = self._sub_all_refs(sub_value, handler_method) # type: ignore[no-untyped-call]
sub_value = self._sub_all_refs(sub_value, handler_method)

elif isinstance(sub_value, list) and len(sub_value) > 0 and isinstance(sub_value[0], str):
# Ex: {Fn::Sub: ["some string", {a:b}] }
sub_value[0] = self._sub_all_refs(sub_value[0], handler_method) # type: ignore[no-untyped-call]
sub_value[0] = self._sub_all_refs(sub_value[0], handler_method)

return sub_value

def _sub_all_refs(self, text, handler_method): # type: ignore[no-untyped-def]
def _sub_all_refs(self, text: str, handler_method: Callable[[str, str], str]) -> str:
"""
Substitute references within a string that is using ${key} syntax by calling the `handler_method` on every
occurrence of this structure. The value returned by this method directly replaces the reference structure.
Expand Down Expand Up @@ -443,7 +437,7 @@ def resolve_resource_refs(
key = self.intrinsic_name
value = input_dict[key]

if not self._check_input_value(value): # type: ignore[no-untyped-call]
if not self._check_input_value(value):
return input_dict

# Value of GetAtt is an array. It can contain any number of elements, with first being the LogicalId of
Expand All @@ -463,7 +457,7 @@ def resolve_resource_refs(
remaining = splits[2:] # if any

resolved_value = supported_resource_refs.get(logical_id, property_name)
return self._get_resolved_dictionary(input_dict, key, resolved_value, remaining) # type: ignore[no-untyped-call]
return self._get_resolved_dictionary(input_dict, key, resolved_value, remaining)

def resolve_resource_id_refs(
self, input_dict: Optional[Any], supported_resource_id_refs: Dict[str, Any]
Expand Down Expand Up @@ -497,7 +491,7 @@ def resolve_resource_id_refs(
key = self.intrinsic_name
value = input_dict[key]

if not self._check_input_value(value): # type: ignore[no-untyped-call]
if not self._check_input_value(value):
return input_dict

value_str = self._resource_ref_separator.join(value)
Expand All @@ -506,9 +500,9 @@ def resolve_resource_id_refs(
remaining = splits[1:] # if any

resolved_value = supported_resource_id_refs.get(logical_id)
return self._get_resolved_dictionary(input_dict, key, resolved_value, remaining) # type: ignore[no-untyped-call]
return self._get_resolved_dictionary(input_dict, key, resolved_value, remaining)

def _check_input_value(self, value): # type: ignore[no-untyped-def]
def _check_input_value(self, value: Any) -> bool:
# Value must be an array with *at least* two elements. If not, this is invalid GetAtt syntax. We just pass along
# the input to CFN for it to do the "official" validation.
if not isinstance(value, list) or len(value) < 2:
Expand All @@ -522,7 +516,9 @@ def _check_input_value(self, value): # type: ignore[no-untyped-def]

return True

def _get_resolved_dictionary(self, input_dict, key, resolved_value, remaining): # type: ignore[no-untyped-def]
def _get_resolved_dictionary(
self, input_dict: Optional[Dict[str, Any]], key: str, resolved_value: Optional[str], remaining: List[str]
) -> Optional[Any]:
"""
Resolves the function and returns the updated dictionary

Expand All @@ -531,7 +527,7 @@ def _get_resolved_dictionary(self, input_dict, key, resolved_value, remaining):
:param resolved_value: Resolved or updated value for this action.
:param remaining: Remaining sections for the GetAtt action.
"""
if resolved_value:
if input_dict and resolved_value:
# We resolved to a new resource logicalId. Use this as the first element and keep remaining elements intact
# This is the new value of Fn::GetAtt
input_dict[key] = [resolved_value] + remaining
Expand Down
55 changes: 37 additions & 18 deletions samtranslator/intrinsics/resolver.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Help resolve intrinsic functions
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Callable, List, Union

from samtranslator.intrinsics.actions import Action, SubAction, RefAction, GetAttAction
from samtranslator.model.exceptions import InvalidTemplateException, InvalidDocumentException
Expand Down Expand Up @@ -45,7 +45,7 @@ def resolve_parameter_refs(self, _input: Any) -> Any:
:param _input: Any primitive type (dict, array, string etc) whose values might contain intrinsic functions
:return: A copy of a dictionary with parameter references replaced by actual value.
"""
return self._traverse(_input, self.parameters, self._try_resolve_parameter_refs) # type: ignore[no-untyped-call]
return self._traverse(_input, self.parameters, self._try_resolve_parameter_refs)

def resolve_sam_resource_refs(
self, _input: Dict[str, Any], supported_resource_refs: SupportedResourceReferences
Expand All @@ -71,7 +71,7 @@ def resolve_sam_resource_refs(
references supported in this SAM template, along with the value they should resolve to.
:return list errors: List of dictionary containing information about invalid reference. Empty list otherwise
"""
return self._traverse(_input, supported_resource_refs, self._try_resolve_sam_resource_refs) # type: ignore[no-untyped-call]
return self._traverse(_input, supported_resource_refs, self._try_resolve_sam_resource_refs)

def resolve_sam_resource_id_refs(self, _input: Dict[str, Any], supported_resource_id_refs: Dict[str, str]) -> Any:
"""
Expand All @@ -94,9 +94,14 @@ def resolve_sam_resource_id_refs(self, _input: Dict[str, Any], supported_resourc
:param dict supported_resource_id_refs: Dictionary that maps old logical ids to new ones.
:return list errors: List of dictionary containing information about invalid reference. Empty list otherwise
"""
return self._traverse(_input, supported_resource_id_refs, self._try_resolve_sam_resource_id_refs) # type: ignore[no-untyped-call]
return self._traverse(_input, supported_resource_id_refs, self._try_resolve_sam_resource_id_refs)

def _traverse(self, input_value, resolution_data, resolver_method): # type: ignore[no-untyped-def]
def _traverse(
self,
input_value: Any,
resolution_data: Union[Dict[str, Any], SupportedResourceReferences],
resolver_method: Callable[[Dict[str, Any], Any], Any],
) -> Any:
"""
Driver method that performs the actual traversal of input and calls the appropriate `resolver_method` when
to perform the resolution.
Expand Down Expand Up @@ -131,14 +136,19 @@ def _traverse(self, input_value, resolution_data, resolver_method): # type: ign
#
input_value = resolver_method(input_value, resolution_data)
if isinstance(input_value, dict):
return self._traverse_dict(input_value, resolution_data, resolver_method) # type: ignore[no-untyped-call]
return self._traverse_dict(input_value, resolution_data, resolver_method)
if isinstance(input_value, list):
return self._traverse_list(input_value, resolution_data, resolver_method) # type: ignore[no-untyped-call]
return self._traverse_list(input_value, resolution_data, resolver_method)
# We can iterate only over dict or list types. Primitive types are terminals

return input_value

def _traverse_dict(self, input_dict, resolution_data, resolver_method): # type: ignore[no-untyped-def]
def _traverse_dict(
self,
input_dict: Dict[str, Any],
resolution_data: Union[Dict[str, Any], SupportedResourceReferences],
resolver_method: Callable[[Dict[str, Any], Any], Any],
) -> Any:
"""
Traverse a dictionary to resolve intrinsic functions on every value

Expand All @@ -148,11 +158,16 @@ def _traverse_dict(self, input_dict, resolution_data, resolver_method): # type:
:return: Modified dictionary with values resolved
"""
for key, value in input_dict.items():
input_dict[key] = self._traverse(value, resolution_data, resolver_method) # type: ignore[no-untyped-call]
input_dict[key] = self._traverse(value, resolution_data, resolver_method)

return input_dict

def _traverse_list(self, input_list, resolution_data, resolver_method): # type: ignore[no-untyped-def]
def _traverse_list(
self,
input_list: List[Any],
resolution_data: Union[Dict[str, Any], SupportedResourceReferences],
resolver_method: Callable[[Dict[str, Any], Any], Any],
) -> Any:
"""
Traverse a list to resolve intrinsic functions on every element

Expand All @@ -162,11 +177,11 @@ def _traverse_list(self, input_list, resolution_data, resolver_method): # type:
:return: Modified list with intrinsic functions resolved
"""
for index, value in enumerate(input_list):
input_list[index] = self._traverse(value, resolution_data, resolver_method) # type: ignore[no-untyped-call]
input_list[index] = self._traverse(value, resolution_data, resolver_method)

return input_list

def _try_resolve_parameter_refs(self, _input, parameters): # type: ignore[no-untyped-def]
def _try_resolve_parameter_refs(self, _input: Dict[str, Any], parameters: Dict[str, Any]) -> Any:
"""
Try to resolve parameter references on the given input object. The object could be of any type.
If the input is not in the format used by intrinsics (ie. dictionary with one key), input is returned
Expand All @@ -177,13 +192,15 @@ def _try_resolve_parameter_refs(self, _input, parameters): # type: ignore[no-un
:param parameters: Parameter values used to for ref substitution
:return:
"""
if not self._is_intrinsic_dict(_input): # type: ignore[no-untyped-call]
if not self._is_intrinsic_dict(_input):
return _input

function_type = list(_input.keys())[0]
return self.supported_intrinsics[function_type].resolve_parameter_refs(_input, parameters)

def _try_resolve_sam_resource_refs(self, _input, supported_resource_refs): # type: ignore[no-untyped-def]
def _try_resolve_sam_resource_refs(
self, _input: Dict[str, Any], supported_resource_refs: SupportedResourceReferences
) -> Any:
"""
Try to resolve SAM resource references on the given template. If the given object looks like one of the
supported intrinsics, it calls the appropriate resolution on it. If not, this method returns the original input
Expand All @@ -194,13 +211,15 @@ def _try_resolve_sam_resource_refs(self, _input, supported_resource_refs): # ty
resource references and the values they resolve to.
:return: Modified input dictionary with references resolved
"""
if not self._is_intrinsic_dict(_input): # type: ignore[no-untyped-call]
if not self._is_intrinsic_dict(_input):
return _input

function_type = list(_input.keys())[0]
return self.supported_intrinsics[function_type].resolve_resource_refs(_input, supported_resource_refs)

def _try_resolve_sam_resource_id_refs(self, _input, supported_resource_id_refs): # type: ignore[no-untyped-def]
def _try_resolve_sam_resource_id_refs(
self, _input: Dict[str, Any], supported_resource_id_refs: Dict[str, str]
) -> Any:
"""
Try to resolve SAM resource id references on the given template. If the given object looks like one of the
supported intrinsics, it calls the appropriate resolution on it. If not, this method returns the original input
Expand All @@ -210,13 +229,13 @@ def _try_resolve_sam_resource_id_refs(self, _input, supported_resource_id_refs):
:param dict supported_resource_id_refs: Dictionary that maps old logical ids to new ones.
:return: Modified input dictionary with id references resolved
"""
if not self._is_intrinsic_dict(_input): # type: ignore[no-untyped-call]
if not self._is_intrinsic_dict(_input):
return _input

function_type = list(_input.keys())[0]
return self.supported_intrinsics[function_type].resolve_resource_id_refs(_input, supported_resource_id_refs)

def _is_intrinsic_dict(self, _input): # type: ignore[no-untyped-def]
def _is_intrinsic_dict(self, _input: Dict[str, Any]) -> bool:
"""
Can the _input represent an intrinsic function in it?

Expand Down
8 changes: 5 additions & 3 deletions samtranslator/model/connector/connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import namedtuple
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Iterable

from samtranslator.model import ResourceResolver
from samtranslator.model.intrinsics import get_logical_id_from_intrinsic, ref, fnGetAtt
Expand Down Expand Up @@ -32,7 +32,7 @@ def _is_nonblank_str(s: Any) -> bool:
return s and isinstance(s, str)


def add_depends_on(logical_id: str, depends_on: str, resource_resolver: ResourceResolver): # type: ignore[no-untyped-def]
def add_depends_on(logical_id: str, depends_on: str, resource_resolver: ResourceResolver) -> None:
"""
Add DependsOn attribute to resource.
"""
Expand All @@ -57,7 +57,9 @@ def replace_depends_on_logical_id(logical_id: str, replacement: List[str], resou
resource["DependsOn"] = insert_unique(depends_on, replacement)


def get_event_source_mappings(event_source_id: str, function_id: str, resource_resolver: ResourceResolver): # type: ignore[no-untyped-def]
def get_event_source_mappings(
event_source_id: str, function_id: str, resource_resolver: ResourceResolver
) -> Iterable[str]:
"""
Get logical IDs of `AWS::Lambda::EventSourceMapping`s between resource logical IDs.
"""
Expand Down
9 changes: 0 additions & 9 deletions tests/intrinsics/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@


class TestAction(TestCase):
def test_subclass_must_override_type(self):

# Subclass must override the intrinsic_name
class MyAction(Action):
pass

with self.assertRaises(TypeError):
MyAction()

def test_can_handle_input(self):
class MyAction(Action):
intrinsic_name = "foo"
Expand Down