Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samtranslator/model/api/api_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ def _get_permission(self, authorizer_name, authorizer_lambda_function_arn): # t
rest_api = ApiGatewayRestApi(self.logical_id, depends_on=self.depends_on, attributes=self.resource_attributes)
api_id = rest_api.get_runtime_attr("rest_api_id")

partition = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition = ArnGenerator.get_partition_name()
resource = "${__ApiId__}/authorizers/*"
source_arn = fnSub(
ArnGenerator.generate_arn(partition=partition, service="execute-api", resource=resource), # type: ignore[no-untyped-call]
Expand Down
2 changes: 1 addition & 1 deletion samtranslator/model/apigateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def generate_swagger(self): # type: ignore[no-untyped-def]

elif authorizer_type == "LAMBDA":
swagger[APIGATEWAY_AUTHORIZER_KEY] = Py27Dict({"type": self._get_swagger_authorizer_type()}) # type: ignore[no-untyped-call, no-untyped-call]
partition = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition = ArnGenerator.get_partition_name()
resource = "lambda:path/2015-03-31/functions/${__FunctionArn__}/invocations"
authorizer_uri = fnSub(
ArnGenerator.generate_arn( # type: ignore[no-untyped-call]
Expand Down
2 changes: 1 addition & 1 deletion samtranslator/model/apigatewayv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def generate_openapi(self) -> Dict[str, Any]:
openapi[APIGATEWAY_AUTHORIZER_KEY] = {"type": "request"} # type: ignore[assignment]

# Generate the lambda arn
partition = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition = ArnGenerator.get_partition_name()
resource = "lambda:path/2015-03-31/functions/${__FunctionArn__}/invocations"
authorizer_uri = fnSub(
ArnGenerator.generate_arn( # type: ignore[no-untyped-call]
Expand Down
2 changes: 1 addition & 1 deletion samtranslator/model/eventsources/cloudwatchlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]

def get_source_arn(self): # type: ignore[no-untyped-def]
resource = "log-group:${__LogGroupName__}:*"
partition = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition = ArnGenerator.get_partition_name()

return fnSub(
ArnGenerator.generate_arn(partition=partition, service="logs", resource=resource), # type: ignore[no-untyped-call]
Expand Down
8 changes: 4 additions & 4 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class Kinesis(PullEventSource):
resource_type = "Kinesis"

def get_policy_arn(self): # type: ignore[no-untyped-def]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole") # type: ignore[no-untyped-call]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
return None
Expand All @@ -260,7 +260,7 @@ class DynamoDB(PullEventSource):
resource_type = "DynamoDB"

def get_policy_arn(self): # type: ignore[no-untyped-def]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole") # type: ignore[no-untyped-call]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
return None
Expand All @@ -272,7 +272,7 @@ class SQS(PullEventSource):
resource_type = "SQS"

def get_policy_arn(self): # type: ignore[no-untyped-def]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole") # type: ignore[no-untyped-call]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
return None
Expand All @@ -284,7 +284,7 @@ class MSK(PullEventSource):
resource_type = "MSK"

def get_policy_arn(self): # type: ignore[no-untyped-def]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole") # type: ignore[no-untyped-call]
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole")

def get_policy_statements(self): # type: ignore[no-untyped-def]
return None
Expand Down
6 changes: 3 additions & 3 deletions samtranslator/model/eventsources/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ def _get_permission(self, resources_to_link, stage, suffix): # type: ignore[no-

# RestApiId can be a simple string or intrinsic function like !Ref. Using Fn::Sub will handle both cases
resource = "${__ApiId__}/" + "${__Stage__}/" + method + path
partition = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition = ArnGenerator.get_partition_name()
source_arn = fnSub(
ArnGenerator.generate_arn(partition=partition, service="execute-api", resource=resource), # type: ignore[no-untyped-call]
{"__ApiId__": api_id, "__Stage__": stage},
Expand All @@ -726,7 +726,7 @@ def _add_swagger_integration(self, api, api_id, function, intrinsics_resolver):
if swagger_body is None:
return

partition = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition = ArnGenerator.get_partition_name()
uri = _build_apigw_integration_uri(function, partition) # type: ignore[no-untyped-call]

editor = SwaggerEditor(swagger_body)
Expand Down Expand Up @@ -999,7 +999,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]

resource = "rule/${RuleName}"

partition = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition = ArnGenerator.get_partition_name()
source_arn = fnSub(
ArnGenerator.generate_arn(partition=partition, service="iot", resource=resource), # type: ignore[no-untyped-call]
{"RuleName": ref(self.logical_id)},
Expand Down
4 changes: 2 additions & 2 deletions samtranslator/model/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def scheduler_assume_role_policy(cls) -> Dict[str, Any]:
return document

@classmethod
def lambda_assume_role_policy(cls): # type: ignore[no-untyped-def]
def lambda_assume_role_policy(cls) -> Dict[str, Any]:
document = {
"Version": "2012-10-17",
"Statement": [
Expand All @@ -106,7 +106,7 @@ def lambda_assume_role_policy(cls): # type: ignore[no-untyped-def]
return document

@classmethod
def dead_letter_queue_policy(cls, action, resource): # type: ignore[no-untyped-def]
def dead_letter_queue_policy(cls, action: Any, resource: Any) -> Dict[str, Any]:
"""Return the DeadLetterQueue Policy to be added to the LambdaRole
:returns: Policy for the DeadLetterQueue
:rtype: Dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ def get_codedeploy_iam_role(self): # type: ignore[no-untyped-def]

# CodeDeploy has a new managed policy. We cannot update any existing partitions, without customer reach out
# that support AWSCodeDeployRoleForLambda since this could regress stacks that are currently deployed.
if ArnGenerator.get_partition_name() in ["aws-iso", "aws-iso-b"]: # type: ignore[no-untyped-call]
if ArnGenerator.get_partition_name() in ["aws-iso", "aws-iso-b"]:
iam_role.ManagedPolicyArns = [
ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSCodeDeployRoleForLambdaLimited") # type: ignore[no-untyped-call]
ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSCodeDeployRoleForLambdaLimited")
]
else:
iam_role.ManagedPolicyArns = [
ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSCodeDeployRoleForLambda") # type: ignore[no-untyped-call]
ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSCodeDeployRoleForLambda")
]

if self.needs_resource_condition(): # type: ignore[no-untyped-call]
Expand Down
7 changes: 4 additions & 3 deletions samtranslator/model/resource_policies.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import Enum
from collections import namedtuple
from typing import Dict, Any, List

from samtranslator.model.intrinsics import (
is_intrinsic,
Expand Down Expand Up @@ -28,7 +29,7 @@ class ResourcePolicies(object):

POLICIES_PROPERTY_NAME = "Policies"

def __init__(self, resource_properties, policy_template_processor=None): # type: ignore[no-untyped-def]
def __init__(self, resource_properties: Dict[str, Any], policy_template_processor: Any = None):
"""
Initialize with policies data from resource's properties

Expand All @@ -41,7 +42,7 @@ def __init__(self, resource_properties, policy_template_processor=None): # type
self._policy_template_processor = policy_template_processor

# Build the list of policies upon construction.
self.policies = self._get_policies(resource_properties) # type: ignore[no-untyped-call]
self.policies = self._get_policies(resource_properties)

def get(self): # type: ignore[no-untyped-def]
"""
Expand All @@ -56,7 +57,7 @@ def get(self): # type: ignore[no-untyped-def]
def __len__(self): # type: ignore[no-untyped-def]
return len(self.policies)

def _get_policies(self, resource_properties): # type: ignore[no-untyped-def]
def _get_policies(self, resource_properties: Dict[str, Any]) -> List[Any]:
"""
Returns a list of policies from the resource properties. This method knows how to interpret and handle
polymorphic nature of the policies property.
Expand Down
44 changes: 24 additions & 20 deletions samtranslator/model/sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException
from samtranslator.model.preferences.deployment_preference_collection import DeploymentPreferenceCollection
from samtranslator.model.resource_policies import ResourcePolicies
from samtranslator.model.iam import IAMManagedPolicy, IAMRolePolicies
from samtranslator.model.iam import IAMManagedPolicy, IAMRolePolicies, IAMRole
from samtranslator.model.lambda_ import (
LambdaFunction,
LambdaVersion,
Expand Down Expand Up @@ -237,10 +237,10 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
self.logical_id,
"AutoPublishCodeSha256 must be a string",
)
lambda_version = self._construct_version( # type: ignore[no-untyped-call]
lambda_version = self._construct_version(
lambda_function, intrinsics_resolver=intrinsics_resolver, code_sha256=code_sha256
)
lambda_alias = self._construct_alias(alias_name, lambda_function, lambda_version) # type: ignore[no-untyped-call]
lambda_alias = self._construct_alias(alias_name, lambda_function, lambda_version)
resources.append(lambda_version)
resources.append(lambda_alias)

Expand Down Expand Up @@ -274,7 +274,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]

execution_role = None
if lambda_function.Role is None:
execution_role = self._construct_role(managed_policy_map, event_invoke_policies) # type: ignore[no-untyped-call]
execution_role = self._construct_role(managed_policy_map, event_invoke_policies)
lambda_function.Role = execution_role.get_runtime_attr("arn")
resources.append(execution_role)

Expand Down Expand Up @@ -357,7 +357,7 @@ def _validate_and_inject_resource(
"""
accepted_types_list = ["SQS", "SNS", "EventBridge", "Lambda"]
auto_inject_list = ["SQS", "SNS"]
resource = None
resource: Optional[Union[SNSTopic, SQSQueue]] = None
policy = {}
destination = dest_config.get("Destination")

Expand All @@ -367,11 +367,11 @@ def _validate_and_inject_resource(
self.logical_id, "'Type: {}' must be one of {}".format(dest_config.get("Type"), accepted_types_list)
)

property_condition, dest_arn = self._get_or_make_condition( # type: ignore[no-untyped-call]
property_condition, dest_arn = self._get_or_make_condition(
dest_config.get("Destination"), logical_id, conditions
)
if dest_config.get("Destination") is None or property_condition is not None:
combined_condition = self._make_and_conditions( # type: ignore[no-untyped-call]
combined_condition = self._make_and_conditions(
self.get_passthrough_resource_attributes().get("Condition"), property_condition, conditions
)
if dest_config.get("Type") in auto_inject_list:
Expand All @@ -380,7 +380,7 @@ def _validate_and_inject_resource(
resource_logical_id + "Queue", attributes=self.get_passthrough_resource_attributes()
)
if dest_config.get("Type") == "SNS":
resource = SNSTopic( # type: ignore[assignment]
resource = SNSTopic(
resource_logical_id + "Topic", attributes=self.get_passthrough_resource_attributes()
)
if resource:
Expand All @@ -402,7 +402,7 @@ def _validate_and_inject_resource(

return resource, destination, policy

def _make_and_conditions(self, resource_condition, property_condition, conditions): # type: ignore[no-untyped-def]
def _make_and_conditions(self, resource_condition: Any, property_condition: Any, conditions: Dict[str, Any]) -> Any:
if resource_condition is None:
return property_condition

Expand All @@ -415,7 +415,7 @@ def _make_and_conditions(self, resource_condition, property_condition, condition

return condition_name

def _get_or_make_condition(self, destination, logical_id, conditions): # type: ignore[no-untyped-def]
def _get_or_make_condition(self, destination: Any, logical_id: str, conditions: Dict[str, Any]) -> Tuple[Any, Any]:
"""
This method checks if there is an If condition on Destination property. Since we auto create
SQS and SNS if the destination ARN is not provided, we need to make sure that If condition
Expand Down Expand Up @@ -542,7 +542,9 @@ def _add_event_invoke_managed_policy(
policy = IAMRolePolicies.lambda_invoke_function_role_policy(dest_arn, logical_id)
return policy

def _construct_role(self, managed_policy_map, event_invoke_policies): # type: ignore[no-untyped-def]
def _construct_role(
self, managed_policy_map: Dict[str, Any], event_invoke_policies: List[Dict[str, Any]]
) -> IAMRole:
"""Constructs a Lambda execution role based on this SAM function's Policies property.

:returns: the generated IAM Role
Expand All @@ -553,18 +555,18 @@ def _construct_role(self, managed_policy_map, event_invoke_policies): # type: i
if self.AssumeRolePolicyDocument is not None:
assume_role_policy_document = self.AssumeRolePolicyDocument
else:
assume_role_policy_document = IAMRolePolicies.lambda_assume_role_policy() # type: ignore[no-untyped-call]
assume_role_policy_document = IAMRolePolicies.lambda_assume_role_policy()

managed_policy_arns = [ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaBasicExecutionRole")] # type: ignore[no-untyped-call]
managed_policy_arns = [ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaBasicExecutionRole")]
if self.Tracing:
managed_policy_name = get_xray_managed_policy_name() # type: ignore[no-untyped-call]
managed_policy_arns.append(ArnGenerator.generate_aws_managed_policy_arn(managed_policy_name)) # type: ignore[no-untyped-call]
managed_policy_name = get_xray_managed_policy_name()
managed_policy_arns.append(ArnGenerator.generate_aws_managed_policy_arn(managed_policy_name))
if self.VpcConfig:
managed_policy_arns.append(
ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaVPCAccessExecutionRole") # type: ignore[no-untyped-call]
ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaVPCAccessExecutionRole")
)

function_policies = ResourcePolicies( # type: ignore[no-untyped-call]
function_policies = ResourcePolicies(
{"Policies": self.Policies},
# No support for policy templates in the "core"
policy_template_processor=None,
Expand All @@ -573,7 +575,7 @@ def _construct_role(self, managed_policy_map, event_invoke_policies): # type: i

if self.DeadLetterQueue:
policy_documents.append(
IAMRolePolicies.dead_letter_queue_policy( # type: ignore[no-untyped-call]
IAMRolePolicies.dead_letter_queue_policy(
self.dead_letter_queue_policy_actions[self.DeadLetterQueue["Type"]],
self.DeadLetterQueue["TargetArn"],
)
Expand Down Expand Up @@ -814,7 +816,9 @@ def _construct_inline_code(*args, **kwargs): # type: ignore[no-untyped-def]
dispatch_function = artifact_dispatch[filtered_key]
return dispatch_function(artifacts[filtered_key], self.logical_id, filtered_key) # type: ignore[operator]

def _construct_version(self, function, intrinsics_resolver, code_sha256=None): # type: ignore[no-untyped-def]
def _construct_version(
self, function: LambdaFunction, intrinsics_resolver: IntrinsicsResolver, code_sha256: Optional[str] = None
) -> LambdaVersion:
"""Constructs a Lambda Version resource that will be auto-published when CodeUri of the function changes.
Old versions will not be deleted without a direct reference from the CloudFormation template.

Expand Down Expand Up @@ -879,7 +883,7 @@ def _construct_version(self, function, intrinsics_resolver, code_sha256=None):

return lambda_version

def _construct_alias(self, name, function, version): # type: ignore[no-untyped-def]
def _construct_alias(self, name: str, function: LambdaFunction, version: LambdaVersion) -> LambdaAlias:
"""Constructs a Lambda Alias for the given function and pointing to the given version

:param string name: Name of the alias
Expand Down
4 changes: 2 additions & 2 deletions samtranslator/model/stepfunctions/generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ def _construct_role(self): # type: ignore[no-untyped-def]
"""
policies = self.policies[:]
if self.tracing and self.tracing.get("Enabled") is True:
policies.append(get_xray_managed_policy_name()) # type: ignore[no-untyped-call]
policies.append(get_xray_managed_policy_name())

state_machine_policies = ResourcePolicies( # type: ignore[no-untyped-call]
state_machine_policies = ResourcePolicies(
{"Policies": policies},
# No support for policy templates in the "core"
policy_template_processor=None,
Expand Down
4 changes: 2 additions & 2 deletions samtranslator/model/xray_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from samtranslator.translator.arn_generator import ArnGenerator


def get_xray_managed_policy_name(): # type: ignore[no-untyped-def]
def get_xray_managed_policy_name() -> str:
# use previous (old) policy name for regular regions
# for china and gov regions, use the newer policy name
partition_name = ArnGenerator.get_partition_name() # type: ignore[no-untyped-call]
partition_name = ArnGenerator.get_partition_name()
if partition_name == "aws":
return "AWSXrayWriteOnlyAccess"
return "AWSXRayDaemonWriteAccess"
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def on_before_transform_resource(self, logical_id, resource_type, resource_prope
if not self._is_supported(resource_type): # type: ignore[no-untyped-call]
return

function_policies = ResourcePolicies(resource_properties, self._policy_template_processor) # type: ignore[no-untyped-call]
function_policies = ResourcePolicies(resource_properties, self._policy_template_processor)

if len(function_policies) == 0:
# No policies to process
Expand Down
2 changes: 1 addition & 1 deletion samtranslator/region_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def is_apigw_edge_configuration_supported(cls): # type: ignore[no-untyped-def]
:return: True, if API Gateway does not support Edge configuration
"""

return ArnGenerator.get_partition_name() not in [ # type: ignore[no-untyped-call]
return ArnGenerator.get_partition_name() not in [
"aws-us-gov",
"aws-iso",
"aws-iso-b",
Expand Down
Loading