Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge main to develop #2963

Merged
merged 4 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cfnlintrc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ ignore_templates:
- tests/translator/output/**/connector_sfn_to_function.json
- tests/translator/output/**/connector_sns_to_function.json
- tests/translator/output/**/connector_table_to_function.json
- tests/translator/output/**/documentdb_with_intrinsics.json # TODO: remove once DocumentDDB is available
- tests/translator/output/**/eventbridgerule_with_dlq.json
- tests/translator/output/**/function_event_conditions.json
- tests/translator/output/**/function_with_alias_and_code_sha256.json
Expand All @@ -77,6 +78,8 @@ ignore_templates:
- tests/translator/output/**/function_with_deployment_preference_multiple_combinations_conditions_without_passthrough.json
- tests/translator/output/**/function_with_deployment_preference_passthrough_condition_with_supported_intrinsics.json
- tests/translator/output/**/function_with_dlq.json
- tests/translator/output/**/function_with_documentdb_with_kms.json # TODO: remove once DocumentDDB is available
- tests/translator/output/**/function_with_documentdb.json # TODO: remove once DocumentDDB is available
- tests/translator/output/**/function_with_event_dest.json
- tests/translator/output/**/function_with_event_dest_basic.json
- tests/translator/output/**/function_with_event_dest_conditional.json
Expand Down
2 changes: 1 addition & 1 deletion samtranslator/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.59.0"
__version__ = "1.60.0"
21 changes: 21 additions & 0 deletions samtranslator/internal/schema_source/aws_serverless_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,26 @@ class DynamoDBEvent(BaseModel):
Properties: DynamoDBEventProperties = event("Properties")


class DocumentDBEventProperties(BaseModel):
BatchSize: Optional[PassThroughProp] # TODO: add documentation
Cluster: PassThroughProp # TODO: add documentation
CollectionName: Optional[PassThroughProp] # TODO: add documentation
DatabaseName: PassThroughProp # TODO: add documentation
Enabled: Optional[PassThroughProp] # TODO: add documentation
FilterCriteria: Optional[PassThroughProp] # TODO: add documentation
FullDocument: Optional[PassThroughProp] # TODO: add documentation
MaximumBatchingWindowInSeconds: Optional[PassThroughProp] # TODO: add documentation
SecretsManagerKmsKeyId: Optional[str] # TODO: add documentation
SourceAccessConfigurations: PassThroughProp # TODO: add documentation
StartingPosition: Optional[PassThroughProp] # TODO: add documentation
StartingPositionTimestamp: Optional[PassThroughProp] # TODO: add documentation


class DocumentDBEvent(BaseModel):
Type: Literal["DocumentDB"] = event("Type")
Properties: DocumentDBEventProperties = event("Properties")


class SQSEventProperties(BaseModel):
BatchSize: Optional[PassThroughProp] = sqseventproperties("BatchSize")
Enabled: Optional[PassThroughProp] = sqseventproperties("Enabled")
Expand Down Expand Up @@ -487,6 +507,7 @@ class Properties(BaseModel):
SNSEvent,
KinesisEvent,
DynamoDBEvent,
DocumentDBEvent,
SQSEvent,
ApiEvent,
ScheduleEvent,
Expand Down
216 changes: 170 additions & 46 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, List, Optional

from samtranslator.internal.deprecation_control import deprecated
from samtranslator.metrics.method_decorator import cw_timer
from samtranslator.model import PassThroughProperty, PropertyType, ResourceMacro
from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
Expand All @@ -17,15 +18,15 @@
class PullEventSource(ResourceMacro, metaclass=ABCMeta):
"""Base class for pull event sources for SAM Functions.

The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues and SQS Queues. All of these correspond to an
The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues, SQS Queues, and DocumentDB Clusters. All of these correspond to an
EventSourceMapping in Lambda, and require that the execution role be given to Kinesis Streams, DynamoDB
Streams, or SQS Queues, respectively.

:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source
"""

# Event types that support `FilterCriteria`, stored as a list to keep the alphabetical order
RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DynamoDB", "Kinesis", "MQ", "MSK", "SelfManagedKafka", "SQS"]
RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DocumentDB", "DynamoDB", "Kinesis", "MQ", "MSK", "SelfManagedKafka", "SQS"]

# Note(xinhol): `PullEventSource` should have been an abstract class. Disabling the type check for the next
# line to avoid any potential behavior change.
Expand Down Expand Up @@ -88,6 +89,14 @@ def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
def get_event_source_arn(self) -> Optional[PassThrough]:
"""Return the value to assign to lambda event source mapping's EventSourceArn."""

def add_extra_eventsourcemapping_fields(self, _lambda_eventsourcemapping: LambdaEventSourceMapping) -> None:
"""Adds extra fields to the CloudFormation ESM resource.
This method can be overriden by a subclass if it has extra fields specific to that subclass.

:param LambdaEventSourceMapping lambda_eventsourcemapping: the Event source mapping resource to add the fields to.
"""
return

@cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX)
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: too-many-branches
"""Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
Expand Down Expand Up @@ -183,6 +192,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: t

lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig

self.add_extra_eventsourcemapping_fields(lambda_eventsourcemapping)

if "role" in kwargs:
self._link_policy(kwargs["role"], destination_config_policy) # type: ignore[no-untyped-call]

Expand Down Expand Up @@ -232,13 +243,71 @@ def _validate_filter_criteria(self) -> None:
if list(self.FilterCriteria.keys()) not in [[], ["Filters"]]:
raise InvalidEventException(self.relative_id, "FilterCriteria field has a wrong format")

def validate_secrets_manager_kms_key_id(self): # type: ignore[no-untyped-def]
if self.SecretsManagerKmsKeyId and not isinstance(self.SecretsManagerKmsKeyId, str):
def validate_secrets_manager_kms_key_id(self) -> None:
if self.SecretsManagerKmsKeyId:
sam_expect(
self.SecretsManagerKmsKeyId, self.relative_id, "SecretsManagerKmsKeyId", is_sam_event=True
).to_be_a_string()

def _validate_source_access_configurations(self, supported_types: List[str], required_type: str) -> str:
"""
Validate the SourceAccessConfigurations parameter and return the URI to
be used for policy statement creation.
"""

if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
"Provided SecretsManagerKmsKeyId should be of type str.",
f"No SourceAccessConfigurations for Amazon {self.resource_type} event provided.",
)
if not isinstance(self.SourceAccessConfigurations, list):
raise InvalidEventException(
self.relative_id,
"Provided SourceAccessConfigurations cannot be parsed into a list.",
)

required_type_uri: Optional[str] = None
for index, conf in enumerate(self.SourceAccessConfigurations):
sam_expect(conf, self.relative_id, f"SourceAccessConfigurations[{index}]", is_sam_event=True).to_be_a_map()
event_type: str = sam_expect(
conf.get("Type"), self.relative_id, f"SourceAccessConfigurations[{index}].Type", is_sam_event=True
).to_be_a_string()
if event_type not in supported_types:
raise InvalidEventException(
self.relative_id,
f"Invalid property Type specified in SourceAccessConfigurations. The supported values are: {supported_types}.",
)
if event_type == required_type:
if required_type_uri:
raise InvalidEventException(
self.relative_id,
f"Multiple {required_type} properties specified in SourceAccessConfigurations.",
)
required_type_uri = conf.get("URI")
if not required_type_uri:
raise InvalidEventException(
self.relative_id,
f"No {required_type} URI property specified in SourceAccessConfigurations.",
)

if not required_type_uri:
raise InvalidEventException(
self.relative_id,
f"No {required_type} property specified in SourceAccessConfigurations.",
)
return required_type_uri

@staticmethod
def _get_kms_decrypt_policy(secrets_manager_kms_key_id: str) -> Dict[str, Any]:
return {
"Action": ["kms:Decrypt"],
"Effect": "Allow",
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
+ secrets_manager_kms_key_id
},
}


class Kinesis(PullEventSource):
"""Kinesis event source."""
Expand Down Expand Up @@ -366,45 +435,8 @@ def get_policy_arn(self) -> Optional[str]:
return None

def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
"No SourceAccessConfigurations for Amazon MQ event provided.",
)
if not isinstance(self.SourceAccessConfigurations, list):
raise InvalidEventException(
self.relative_id,
"Provided SourceAccessConfigurations cannot be parsed into a list.",
)
basic_auth_uri = None
for index, conf in enumerate(self.SourceAccessConfigurations):
sam_expect(conf, self.relative_id, f"SourceAccessConfigurations[{index}]", is_sam_event=True).to_be_a_map()
event_type: str = sam_expect(
conf.get("Type"), self.relative_id, f"SourceAccessConfigurations[{index}].Type", is_sam_event=True
).to_be_a_string()
if event_type not in ("BASIC_AUTH", "VIRTUAL_HOST"):
raise InvalidEventException(
self.relative_id,
"Invalid property specified in SourceAccessConfigurations for Amazon MQ event.",
)
if event_type == "BASIC_AUTH":
if basic_auth_uri:
raise InvalidEventException(
self.relative_id,
"Multiple BASIC_AUTH properties specified in SourceAccessConfigurations for Amazon MQ event.",
)
basic_auth_uri = conf.get("URI")
if not basic_auth_uri:
raise InvalidEventException(
self.relative_id,
"No BASIC_AUTH URI property specified in SourceAccessConfigurations for Amazon MQ event.",
)
basic_auth_uri = self._validate_source_access_configurations(["BASIC_AUTH", "VIRTUAL_HOST"], "BASIC_AUTH")

if not basic_auth_uri:
raise InvalidEventException(
self.relative_id,
"No BASIC_AUTH property specified in SourceAccessConfigurations for Amazon MQ event.",
)
document = {
"PolicyName": "SamAutoGeneratedAMQPolicy",
"PolicyDocument": {
Expand All @@ -427,7 +459,7 @@ def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
},
}
if self.SecretsManagerKmsKeyId:
self.validate_secrets_manager_kms_key_id() # type: ignore[no-untyped-call]
self.validate_secrets_manager_kms_key_id()
kms_policy = {
"Action": "kms:Decrypt",
"Effect": "Allow",
Expand Down Expand Up @@ -499,8 +531,8 @@ def generate_policy_document(self, source_access_configurations: List[Any]): #
statements.append(vpc_permissions)

if self.SecretsManagerKmsKeyId:
self.validate_secrets_manager_kms_key_id() # type: ignore[no-untyped-call]
kms_policy = self.get_kms_policy(self.SecretsManagerKmsKeyId)
self.validate_secrets_manager_kms_key_id()
kms_policy = self._get_kms_decrypt_policy(self.SecretsManagerKmsKeyId)
statements.append(kms_policy)

return {
Expand Down Expand Up @@ -590,6 +622,7 @@ def get_vpc_permission(self) -> Dict[str, Any]:
}

@staticmethod
@deprecated(None)
def get_kms_policy(secrets_manager_kms_key_id: str) -> Dict[str, Any]:
return {
"Action": ["kms:Decrypt"],
Expand All @@ -599,3 +632,94 @@ def get_kms_policy(secrets_manager_kms_key_id: str) -> Dict[str, Any]:
+ secrets_manager_kms_key_id
},
}


class DocumentDB(PullEventSource):
"""DocumentDB event source."""

resource_type = "DocumentDB"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Cluster": PassThroughProperty(True),
"DatabaseName": PassThroughProperty(True),
"CollectionName": PassThroughProperty(False),
"FullDocument": PassThroughProperty(False),
}

Cluster: PassThrough
DatabaseName: PassThrough
CollectionName: Optional[PassThrough]
FullDocument: Optional[PassThrough]

def add_extra_eventsourcemapping_fields(self, lambda_eventsourcemapping: LambdaEventSourceMapping) -> None:
lambda_eventsourcemapping.DocumentDBEventSourceConfig = {
"DatabaseName": self.DatabaseName,
}
if self.CollectionName:
lambda_eventsourcemapping.DocumentDBEventSourceConfig["CollectionName"] = self.CollectionName # type: ignore[attr-defined]
if self.FullDocument:
lambda_eventsourcemapping.DocumentDBEventSourceConfig["FullDocument"] = self.FullDocument # type: ignore[attr-defined]

def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Cluster

def get_policy_arn(self) -> Optional[str]:
return None

def get_policy_statements(self) -> List[Dict[str, Any]]:
basic_auth_uri = self._validate_source_access_configurations(["BASIC_AUTH"], "BASIC_AUTH")

statements = [
{
"Action": [
"secretsmanager:GetSecretValue",
],
"Effect": "Allow",
"Resource": basic_auth_uri,
},
{
"Action": [
"rds:DescribeDBClusterParameters",
],
"Effect": "Allow",
"Resource": {"Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster-pg:*"},
},
{
"Action": [
"rds:DescribeDBSubnetGroups",
],
"Effect": "Allow",
"Resource": {"Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:subgrp:*"},
},
{
"Action": [
"rds:DescribeDBClusters",
],
"Effect": "Allow",
"Resource": self.Cluster,
},
{
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface",
"ec2:DescribeVpcs",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
],
"Effect": "Allow",
"Resource": "*",
},
]

if self.SecretsManagerKmsKeyId:
self.validate_secrets_manager_kms_key_id()
kms_policy = self._get_kms_decrypt_policy(self.SecretsManagerKmsKeyId)
statements.append(kms_policy)

document = {
"PolicyName": "SamAutoGeneratedDocumentDBPolicy",
"PolicyDocument": {"Statement": statements},
}

return [document]
1 change: 1 addition & 0 deletions samtranslator/model/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class LambdaEventSourceMapping(Resource):
resource_type = "AWS::Lambda::EventSourceMapping"
property_types = {
"BatchSize": GeneratedProperty(),
"DocumentDBEventSourceConfig": GeneratedProperty(),
"Enabled": GeneratedProperty(),
"EventSourceArn": GeneratedProperty(),
"FunctionName": GeneratedProperty(),
Expand Down
Loading