Skip to content

Commit fc6bf17

Browse files
authored
Merge pull request #3848 from aws/tmp/1763754101/main
Merge main to develop
2 parents a03307e + 5a53ad0 commit fc6bf17

18 files changed

+877
-16
lines changed

integration/combination/test_function_with_msk.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,20 @@ def test_function_with_msk_trigger_and_s3_onfailure_events_destinations(self):
4141
"combination/function_with_msk_trigger_and_s3_onfailure_events_destinations", parameters
4242
)
4343

44-
def test_function_with_msk_trigger_and_confluent_schema_registry(self):
44+
def test_function_with_msk_trigger_and_premium_features(self):
4545
companion_stack_outputs = self.companion_stack_outputs
4646
parameters = self.get_parameters(companion_stack_outputs)
4747
cluster_name = "MskCluster4-" + generate_suffix()
4848
parameters.append(self.generate_parameter("MskClusterName4", cluster_name))
49-
self._common_validations_for_MSK(
49+
self._common_validations_for_MSK("combination/function_with_msk_trigger_and_premium_features", parameters)
50+
event_source_mapping_result = self._common_validations_for_MSK(
5051
"combination/function_with_msk_trigger_and_confluent_schema_registry", parameters
5152
)
53+
# Verify error handling properties are correctly set
54+
self.assertTrue(event_source_mapping_result.get("BisectBatchOnFunctionError"))
55+
self.assertEqual(event_source_mapping_result.get("MaximumRecordAgeInSeconds"), 3600)
56+
self.assertEqual(event_source_mapping_result.get("MaximumRetryAttempts"), 3)
57+
self.assertEqual(event_source_mapping_result.get("FunctionResponseTypes"), ["ReportBatchItemFailures"])
5258

5359
def _common_validations_for_MSK(self, file_name, parameters):
5460
self.create_and_verify_stack(file_name, parameters)
@@ -74,6 +80,7 @@ def _common_validations_for_MSK(self, file_name, parameters):
7480

7581
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
7682
self.assertEqual(event_source_mapping_kafka_cluster_arn, msk_cluster_arn)
83+
return event_source_mapping_result
7784

7885
def get_parameters(self, dictionary):
7986
parameters = []

integration/combination/test_function_with_self_managed_kafka.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ class TestFunctionWithSelfManagedKafka(BaseTest):
1616
@pytest.mark.flaky(reruns=5)
1717
@parameterized.expand(
1818
[
19-
"combination/function_with_self_managed_kafka",
2019
"combination/function_with_self_managed_kafka_intrinsics",
2120
]
2221
)
@@ -30,3 +29,29 @@ def test_function_with_self_managed_kafka(self, file_name):
3029
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id)
3130
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
3231
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
32+
33+
@parameterized.expand(["combination/function_with_self_managed_kafka"])
34+
def test_function_with_self_managed_kafka_with_provisioned_mode(self, file_name):
35+
self.create_and_verify_stack(file_name)
36+
# Get the notification configuration and make sure Lambda Function connection is added
37+
lambda_client = self.client_provider.lambda_client
38+
function_name = self.get_physical_id_by_type("AWS::Lambda::Function")
39+
lambda_function_arn = lambda_client.get_function_configuration(FunctionName=function_name)["FunctionArn"]
40+
event_source_mapping_id = self.get_physical_id_by_type("AWS::Lambda::EventSourceMapping")
41+
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id)
42+
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
43+
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
44+
45+
# Verify error handling properties are correctly set
46+
self.assertTrue(event_source_mapping_result.get("BisectBatchOnFunctionError"))
47+
self.assertEqual(event_source_mapping_result.get("MaximumRecordAgeInSeconds"), 3600)
48+
self.assertEqual(event_source_mapping_result.get("MaximumRetryAttempts"), 3)
49+
self.assertEqual(event_source_mapping_result.get("FunctionResponseTypes"), ["ReportBatchItemFailures"])
50+
# Uncomment this once SDK is updated.
51+
# provisioned_poller_config = event_source_mapping_result["ProvisionedPollerConfig"]
52+
# actual_poller_group_name = provisioned_poller_config["PollerGroupName"]
53+
# self.assertEqual(
54+
# actual_poller_group_name,
55+
# "test1",
56+
# f"Expected PollerGroupName to be 'test1' but got '{actual_poller_group_name}'",
57+
# )
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,17 @@ Resources:
6666
Ref: MyMskCluster
6767
Topics:
6868
- SchemaRegistryTestTopic
69+
DestinationConfig:
70+
OnFailure:
71+
Type: Kafka
72+
Destination: kafka://testTopic
6973
ProvisionedPollerConfig:
7074
MinimumPollers: 1
75+
BisectBatchOnFunctionError: true
76+
MaximumRecordAgeInSeconds: 3600
77+
MaximumRetryAttempts: 3
78+
FunctionResponseTypes:
79+
- ReportBatchItemFailures
7180
SchemaRegistryConfig:
7281
AccessConfigs:
7382
- Type: BASIC_AUTH

integration/resources/templates/combination/function_with_self_managed_kafka.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,18 @@ Resources:
1515
- 123.45.67.89:9096
1616
Topics:
1717
- Topic1
18+
ProvisionedPollerConfig:
19+
MinimumPollers: 1
20+
PollerGroupName: test1
1821
SourceAccessConfigurations:
1922
- Type: BASIC_AUTH
2023
URI:
2124
Ref: KafkaUserSecret
25+
BisectBatchOnFunctionError: true
26+
MaximumRecordAgeInSeconds: 3600
27+
MaximumRetryAttempts: 3
28+
FunctionResponseTypes:
29+
- ReportBatchItemFailures
2230

2331
KafkaUserSecret:
2432
Type: AWS::SecretsManager::Secret

samtranslator/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.102.0"
1+
__version__ = "1.103.0"

samtranslator/internal/schema_source/aws_serverless_function.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,10 @@ class MSKEventProperties(BaseModel):
423423
DestinationConfig: Optional[PassThroughProp] # TODO: add documentation
424424
ProvisionedPollerConfig: Optional[PassThroughProp]
425425
SchemaRegistryConfig: Optional[PassThroughProp]
426+
BisectBatchOnFunctionError: Optional[PassThroughProp] = mskeventproperties("BisectBatchOnFunctionError")
427+
FunctionResponseTypes: Optional[PassThroughProp] = mskeventproperties("FunctionResponseTypes")
428+
MaximumRecordAgeInSeconds: Optional[PassThroughProp] = mskeventproperties("MaximumRecordAgeInSeconds")
429+
MaximumRetryAttempts: Optional[PassThroughProp] = mskeventproperties("MaximumRetryAttempts")
426430

427431

428432
class MSKEvent(BaseModel):
@@ -463,6 +467,12 @@ class SelfManagedKafkaEventProperties(BaseModel):
463467
Topics: PassThroughProp = selfmanagedkafkaeventproperties("Topics")
464468
ProvisionedPollerConfig: Optional[PassThroughProp]
465469
SchemaRegistryConfig: Optional[PassThroughProp]
470+
BisectBatchOnFunctionError: Optional[PassThroughProp] = selfmanagedkafkaeventproperties(
471+
"BisectBatchOnFunctionError"
472+
)
473+
MaximumRecordAgeInSeconds: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("MaximumRecordAgeInSeconds")
474+
MaximumRetryAttempts: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("MaximumRetryAttempts")
475+
FunctionResponseTypes: Optional[PassThroughProp] = selfmanagedkafkaeventproperties("FunctionResponseTypes")
466476

467477

468478
class SelfManagedKafkaEvent(BaseModel):

samtranslator/internal/schema_source/sam-docs.json

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,11 @@
298298
"StartingPosition": "The position in a stream from which to start reading\\. \n+ `AT_TIMESTAMP` \u2013 Specify a time from which to start reading records\\.\n+ `LATEST` \u2013 Read only new records\\.\n+ `TRIM_HORIZON` \u2013 Process all available records\\.\n*Valid values*: `AT_TIMESTAMP` \\| `LATEST` \\| `TRIM_HORIZON` \n*Type*: String \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
299299
"StartingPositionTimestamp": "The time from which to start reading, in Unix time seconds\\. Define `StartingPositionTimestamp` when `StartingPosition` is specified as `AT_TIMESTAMP`\\. \n*Type*: Double \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPositionTimestamp`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingpositiontimestamp) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
300300
"Stream": "The Amazon Resource Name \\(ARN\\) of the data stream or a stream consumer\\. \n*Type*: String \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`EventSourceArn`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-eventsourcearn) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
301-
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\."
301+
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
302+
"BisectBatchOnFunctionError": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
303+
"FunctionResponseTypes": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
304+
"MaximumRecordAgeInSeconds": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
305+
"MaximumRetryAttempts": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\."
302306
},
303307
"sam-property-function-onfailure": {
304308
"Destination": "The Amazon Resource Name \\(ARN\\) of the destination resource\\. \n*Type*: String \n*Required*: Conditional \n*AWS CloudFormation compatibility*: This property is similar to the [`OnFailure`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventinvokeconfig-destinationconfig-onfailure.html#cfn-lambda-eventinvokeconfig-destinationconfig-onfailure-destination) property of an `AWS::Lambda::EventInvokeConfig` resource\\. SAM will add any necessary permissions to the auto\\-generated IAM Role associated with this function to access the resource referenced in this property\\. \n*Additional notes*: If the type is Lambda/EventBridge, Destination is required\\.",
@@ -379,7 +383,11 @@
379383
"SourceAccessConfigurations": "An array of the authentication protocol, VPC components, or virtual host to secure and define your event source\\. \n*Valid values*: `BASIC_AUTH | CLIENT_CERTIFICATE_TLS_AUTH | SASL_SCRAM_256_AUTH | SASL_SCRAM_512_AUTH | SERVER_ROOT_CA_CERTIFICATE` \n*Type*: List of [SourceAccessConfiguration](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-eventsourcemapping-sourceaccessconfiguration.html) \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the `[ SourceAccessConfigurations](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-sourceaccessconfigurations)` property of an `AWS::Lambda::EventSourceMapping` resource\\.",
380384
"StartingPosition": "The position in a stream from which to start reading\\. \n+ `AT_TIMESTAMP` \u2013 Specify a time from which to start reading records\\.\n+ `LATEST` \u2013 Read only new records\\.\n+ `TRIM_HORIZON` \u2013 Process all available records\\.\n*Valid values*: `AT_TIMESTAMP` \\| `LATEST` \\| `TRIM_HORIZON` \n*Type*: String \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPosition`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
381385
"StartingPositionTimestamp": "The time from which to start reading, in Unix time seconds\\. Define `StartingPositionTimestamp` when `StartingPosition` is specified as `AT_TIMESTAMP`\\. \n*Type*: Double \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`StartingPositionTimestamp`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingpositiontimestamp) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
382-
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\."
386+
"Topics": "The name of the Kafka topic\\. \n*Type*: List \n*Required*: Yes \n*AWS CloudFormation compatibility*: This property is passed directly to the [`Topics`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-topics) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
387+
"BisectBatchOnFunctionError": "If the function returns an error, split the batch in two and retry\\. \n*Type*: Boolean \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`BisectBatchOnFunctionError`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-bisectbatchonfunctionerror) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
388+
"FunctionResponseTypes": "A list of the response types currently applied to the event source mapping\\. For more information, see [Reporting batch item failures](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting) in the *AWS Lambda Developer Guide*\\. \n*Valid values*: `ReportBatchItemFailures` \n*Type*: List \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FunctionResponseTypes`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-functionresponsetypes) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
389+
"MaximumRecordAgeInSeconds": "The maximum age of a record that Lambda sends to a function for processing\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRecordAgeInSeconds`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds) property of an `AWS::Lambda::EventSourceMapping` resource\\.",
390+
"MaximumRetryAttempts": "The maximum number of times to retry when the function returns an error\\. \n*Type*: Integer \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`MaximumRetryAttempts`](http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumretryattempts) property of an `AWS::Lambda::EventSourceMapping` resource\\."
383391
},
384392
"sam-property-function-sns": {
385393
"FilterPolicy": "The filter policy JSON assigned to the subscription\\. For more information, see [GetSubscriptionAttributes](https://docs.aws.amazon.com/sns/latest/api/API_GetSubscriptionAttributes.html) in the Amazon Simple Notification Service API Reference\\. \n*Type*: [SnsFilterPolicy](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-subscription.html#cfn-sns-subscription-filterpolicy) \n*Required*: No \n*AWS CloudFormation compatibility*: This property is passed directly to the [`FilterPolicy`](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sns-subscription.html#cfn-sns-subscription-filterpolicy) property of an `AWS::SNS::Subscription` resource\\.",

samtranslator/model/eventsources/pull.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,10 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
205205
if destination_type:
206206
# delete this field as its used internally for SAM to determine the policy
207207
del on_failure["Type"]
208-
# the values 'SQS', 'SNS', and 'S3' are allowed. No intrinsics are allowed
209-
if destination_type not in ["SQS", "SNS", "S3"]:
208+
# the values 'SQS', 'SNS', 'S3', and 'Kafka' are allowed. No intrinsics are allowed
209+
if destination_type not in ["SQS", "SNS", "S3", "Kafka"]:
210210
raise InvalidEventException(
211-
self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'"
211+
self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', 'S3', and 'Kafka'"
212212
)
213213
if destination_type == "SQS":
214214
queue_arn = on_failure.get("Destination")
@@ -225,6 +225,9 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
225225
destination_config_policy = IAMRolePolicies().s3_send_event_payload_role_policy(
226226
s3_arn, self.logical_id
227227
)
228+
elif destination_type == "Kafka":
229+
# No policy generation for Kafka destinations - pass through
230+
pass
228231

229232
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
230233

0 commit comments

Comments
 (0)