Skip to content

Commit

Permalink
feat: Support for self managed kafka as an event source (#2091)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonife committed Oct 8, 2021
1 parent fb4d963 commit 3d3da17
Show file tree
Hide file tree
Showing 38 changed files with 2,367 additions and 14 deletions.
21 changes: 21 additions & 0 deletions integration/combination/test_function_with_self_managed_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from integration.helpers.base_test import BaseTest
from parameterized import parameterized


class TestFunctionWithSelfManagedKafka(BaseTest):
@parameterized.expand(
[
"combination/function_with_self_managed_kafka",
"combination/function_with_self_managed_kafka_intrinsics",
]
)
def test_function_with_self_managed_kafka(self, file_name):
self.create_and_verify_stack(file_name)
# Get the notification configuration and make sure Lambda Function connection is added
lambda_client = self.client_provider.lambda_client
function_name = self.get_physical_id_by_type("AWS::Lambda::Function")
lambda_function_arn = lambda_client.get_function_configuration(FunctionName=function_name)["FunctionArn"]
event_source_mapping_id = self.get_physical_id_by_type("AWS::Lambda::EventSourceMapping")
event_source_mapping_result = lambda_client.get_event_source_mapping(UUID=event_source_mapping_id)
event_source_mapping_function_arn = event_source_mapping_result["FunctionArn"]
self.assertEqual(event_source_mapping_function_arn, lambda_function_arn)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{
"LogicalResourceId": "KafkaFunction",
"ResourceType": "AWS::Lambda::Function"
},
{
"LogicalResourceId": "KafkaFunctionMyKafkaCluster",
"ResourceType": "AWS::Lambda::EventSourceMapping"
},
{
"LogicalResourceId": "KafkaFunctionRole",
"ResourceType": "AWS::IAM::Role"
},
{
"LogicalResourceId": "KafkaUserSecret",
"ResourceType": "AWS::SecretsManager::Secret"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{
"LogicalResourceId": "KafkaFunctionWithIntrinsics",
"ResourceType": "AWS::Lambda::Function"
},
{
"LogicalResourceId": "KafkaFunctionWithIntrinsicsMyKafkaClusterWithIntrinsics",
"ResourceType": "AWS::Lambda::EventSourceMapping"
},
{
"LogicalResourceId": "KafkaFunctionWithIntrinsicsRole",
"ResourceType": "AWS::IAM::Role"
},
{
"LogicalResourceId": "KafkaUserSecret",
"ResourceType": "AWS::SecretsManager::Secret"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Resources:
KafkaFunction:
Type: AWS::Serverless::Function
Properties:
Handler: index.handler
Runtime: nodejs12.x
CodeUri: ${codeuri}
MemorySize: 128
Events:
MyKafkaCluster:
Type: SelfManagedKafka
Properties:
KafkaBootstrapServers:
- abc.xyz.com:9092
- 123.45.67.89:9096
Topics:
- Topic1
SourceAccessConfigurations:
- Type: BASIC_AUTH
URI:
Ref: KafkaUserSecret

KafkaUserSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: KafkaUserPassword
SecretString:
Fn::Sub: '{"username":"testBrokerUser","password":"testBrokerPassword"}'

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Parameters:
TopicsValue:
Type: CommaDelimitedList
Default: Topic

KafkaBootstrapServersValue:
Type: String
Default: abc.xyz.com:9092

Resources:
KafkaFunctionWithIntrinsics:
Type: AWS::Serverless::Function
Properties:
Handler: index.handler
Runtime: nodejs12.x
CodeUri: ${codeuri}
MemorySize: 128
Events:
MyKafkaClusterWithIntrinsics:
Type: SelfManagedKafka
Properties:
KafkaBootstrapServers:
- Ref: KafkaBootstrapServersValue
Topics:
Ref: TopicsValue
SourceAccessConfigurations:
- Type: BASIC_AUTH
URI:
Ref: KafkaUserSecret

KafkaUserSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: KafkaUserPassword
SecretString:
Fn::Sub: '{"username":"testBrokerUserWithInstrinsic","password":"testBrokerPasswordWithInstrinsic"}'

159 changes: 157 additions & 2 deletions samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from six import string_types
from samtranslator.metrics.method_decorator import cw_timer
from samtranslator.model import ResourceMacro, PropertyType
from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
from samtranslator.model.types import is_type, is_str
from samtranslator.model.types import is_type, is_str, list_of
from samtranslator.model.intrinsics import is_intrinsic

from samtranslator.model.lambda_ import LambdaEventSourceMapping
from samtranslator.translator.arn_generator import ArnGenerator
Expand All @@ -20,6 +22,7 @@ class PullEventSource(ResourceMacro):
"""

resource_type = None
requires_stream_queue_broker = True
property_types = {
"Stream": PropertyType(False, is_str()),
"Queue": PropertyType(False, is_str()),
Expand All @@ -39,6 +42,7 @@ class PullEventSource(ResourceMacro):
"SecretsManagerKmsKeyId": PropertyType(False, is_str()),
"TumblingWindowInSeconds": PropertyType(False, is_type(int)),
"FunctionResponseTypes": PropertyType(False, is_type(list)),
"KafkaBootstrapServers": PropertyType(False, is_type(list)),
}

def get_policy_arn(self):
Expand Down Expand Up @@ -74,7 +78,7 @@ def to_cloudformation(self, **kwargs):
except NotImplementedError:
function_name_or_arn = function.get_runtime_attr("arn")

if not self.Stream and not self.Queue and not self.Broker:
if self.requires_stream_queue_broker and not self.Stream and not self.Queue and not self.Broker:
raise InvalidEventException(
self.relative_id,
"No Queue (for SQS) or Stream (for Kinesis, DynamoDB or MSK) or Broker (for Amazon MQ) provided.",
Expand All @@ -99,6 +103,11 @@ def to_cloudformation(self, **kwargs):
lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds
lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes

if self.KafkaBootstrapServers:
lambda_eventsourcemapping.SelfManagedEventSource = {
"Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers}
}

destination_config_policy = None
if self.DestinationConfig:
# `Type` property is for sam to attach the right policies
Expand Down Expand Up @@ -286,3 +295,149 @@ def get_policy_statements(self):
}
document["PolicyDocument"]["Statement"].append(kms_policy)
return [document]


class SelfManagedKafka(PullEventSource):
"""
SelfManagedKafka event source
"""

resource_type = "SelfManagedKafka"
requires_stream_queue_broker = False
AUTH_MECHANISM = ["SASL_SCRAM_256_AUTH", "SASL_SCRAM_512_AUTH", "BASIC_AUTH"]

def get_policy_arn(self):
return None

def get_policy_statements(self):
if not self.KafkaBootstrapServers:
raise InvalidEventException(
self.relative_id,
"No KafkaBootstrapServers provided for self managed kafka as an event source",
)

if not self.Topics:
raise InvalidEventException(
self.relative_id,
"No Topics provided for self managed kafka as an event source",
)

if len(self.Topics) != 1:
raise InvalidEventException(
self.relative_id,
"Topics for self managed kafka only supports single configuration entry.",
)

if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
"No SourceAccessConfigurations for self managed kafka event provided.",
)
document = self.generate_policy_document()
return [document]

def generate_policy_document(self):
statements = []
authentication_uri, has_vpc_config = self.get_secret_key()
if authentication_uri:
secret_manager = self.get_secret_manager_secret(authentication_uri)
statements.append(secret_manager)

if has_vpc_config:
vpc_permissions = self.get_vpc_permission()
statements.append(vpc_permissions)

if self.SecretsManagerKmsKeyId:
kms_policy = self.get_kms_policy()
statements.append(kms_policy)

document = {
"PolicyDocument": {
"Statement": statements,
"Version": "2012-10-17",
},
"PolicyName": "SelfManagedKafkaExecutionRolePolicy",
}

return document

def get_secret_key(self):
authentication_uri = None
has_vpc_subnet = False
has_vpc_security_group = False
for config in self.SourceAccessConfigurations:
if config.get("Type") == "VPC_SUBNET":
self.validate_uri(config, "VPC_SUBNET")
has_vpc_subnet = True

elif config.get("Type") == "VPC_SECURITY_GROUP":
self.validate_uri(config, "VPC_SECURITY_GROUP")
has_vpc_security_group = True

elif config.get("Type") in self.AUTH_MECHANISM:
if authentication_uri:
raise InvalidEventException(
self.relative_id,
"Multiple auth mechanism properties specified in SourceAccessConfigurations for self managed kafka event.",
)
self.validate_uri(config, "auth mechanism")
authentication_uri = config.get("URI")

else:
raise InvalidEventException(
self.relative_id,
"Invalid SourceAccessConfigurations Type provided for self managed kafka event.",
)

if (not has_vpc_subnet and has_vpc_security_group) or (has_vpc_subnet and not has_vpc_security_group):
raise InvalidEventException(
self.relative_id,
"VPC_SUBNET and VPC_SECURITY_GROUP in SourceAccessConfigurations for SelfManagedKafka must be both provided.",
)
return authentication_uri, (has_vpc_subnet and has_vpc_security_group)

def validate_uri(self, config, msg):
if not config.get("URI"):
raise InvalidEventException(
self.relative_id,
"No {} URI property specified in SourceAccessConfigurations for self managed kafka event.".format(msg),
)

if not isinstance(config.get("URI"), string_types) and not is_intrinsic(config.get("URI")):
raise InvalidEventException(
self.relative_id,
"Wrong Type for {} URI property specified in SourceAccessConfigurations for self managed kafka event.".format(
msg
),
)

def get_secret_manager_secret(self, authentication_uri):
return {
"Action": ["secretsmanager:GetSecretValue"],
"Effect": "Allow",
"Resource": authentication_uri,
}

def get_vpc_permission(self):
return {
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface",
"ec2:DescribeVpcs",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
],
"Effect": "Allow",
"Resource": "*",
}

def get_kms_policy(self):
return {
"Action": ["kms:Decrypt"],
"Effect": "Allow",
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
+ self.SecretsManagerKmsKeyId
},
}
3 changes: 2 additions & 1 deletion samtranslator/model/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class LambdaEventSourceMapping(Resource):
property_types = {
"BatchSize": PropertyType(False, is_type(int)),
"Enabled": PropertyType(False, is_type(bool)),
"EventSourceArn": PropertyType(True, is_str()),
"EventSourceArn": PropertyType(False, is_str()),
"FunctionName": PropertyType(True, is_str()),
"MaximumBatchingWindowInSeconds": PropertyType(False, is_type(int)),
"MaximumRetryAttempts": PropertyType(False, is_type(int)),
Expand All @@ -78,6 +78,7 @@ class LambdaEventSourceMapping(Resource):
"SourceAccessConfigurations": PropertyType(False, is_type(list)),
"TumblingWindowInSeconds": PropertyType(False, is_type(int)),
"FunctionResponseTypes": PropertyType(False, is_type(list)),
"SelfManagedEventSource": PropertyType(False, is_type(dict)),
}

runtime_attrs = {"name": lambda self: ref(self.logical_id)}
Expand Down

0 comments on commit 3d3da17

Please sign in to comment.